From 44299c7221ab96befa427764cd3be97421799b3d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Daniel=20Kochma=C5=84ski?= Date: Mon, 11 May 2020 18:16:56 +0200 Subject: [PATCH] contrib: serve-event: make serve-event multithreading save Only call handlers established in the current thread and use atomic operations to update *descriptor-handlers*. Closes #588. Additionally: - improve the test code - add a test for the leak - provide internet machine link for the tutorial --- contrib/serve-event/event-test-async.lisp | 31 +++++++++ contrib/serve-event/event-test.lisp | 24 ++++--- contrib/serve-event/serve-event.lisp | 81 ++++++++++++++--------- 3 files changed, 94 insertions(+), 42 deletions(-) create mode 100644 contrib/serve-event/event-test-async.lisp diff --git a/contrib/serve-event/event-test-async.lisp b/contrib/serve-event/event-test-async.lisp new file mode 100644 index 00000000..5c13eade --- /dev/null +++ b/contrib/serve-event/event-test-async.lisp @@ -0,0 +1,31 @@ +;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; +;;; Test that serve-event doesn't leak its handlers to other threads +;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; + +(require 'serve-event) + +(defun test-leak (&aux exit) + (let ((out *standard-output*)) + (print "Press enter." out) + (let* ((p1 (mp:process-run-function + 'stdin-2 + (lambda () + (serve-event:with-fd-handler + (0 :input #'(lambda (fd) + (declare (ignore fd)) + (format out "WRONG!~%"))) + (sleep most-positive-fixnum))))) + (p2 (mp:process-run-function + 'stdin-1 + (lambda () + (serve-event:with-fd-handler + (0 :input #'(lambda (fd) + (declare (ignore fd)) + (format out"GOOD!~%"))) + (unwind-protect (serve-event:serve-event) + (mp:interrupt-process p1 (lambda () + (mp:exit-process))))))))) + (mp:process-join p1) + (mp:process-join p2)))) + +(test-leak) diff --git a/contrib/serve-event/event-test.lisp b/contrib/serve-event/event-test.lisp index d80261f1..e1c7bfb6 100644 --- a/contrib/serve-event/event-test.lisp +++ b/contrib/serve-event/event-test.lisp @@ -4,16 +4,20 @@ ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; (require 'serve-event) -(in-package "SERVE-EVENT") -(defun test-stdin () - (format t "DOING STDIN~%") - (with-fd-handler (0 :input #'(lambda (fd) (declare (ignore fd)) - (format t "Got data~%") - (read-char))) - (loop ;; FIXME: End condition - (format t "Entering serve-all-events...~%")(force-output) - (serve-all-events 5) - (format t "Events served~%")))) +(defun test-stdin (&aux exit) + (format t "DOING STDIN. Type Q to exit.~%") + (serve-event:with-fd-handler + (0 :input #'(lambda (fd) + (declare (ignore fd)) + (let ((ch (read-char))) + (format t "Got data ~s~%" ch) + (when (char= ch #\Q) + (setf exit t))))) + (loop until exit + do (format t "Entering serve-all-events...~%") + (force-output) + (serve-event:serve-all-events 5) + (format t "Events served~%")))) (test-stdin) diff --git a/contrib/serve-event/serve-event.lisp b/contrib/serve-event/serve-event.lisp index 204b5d68..528daad0 100644 --- a/contrib/serve-event/serve-event.lisp +++ b/contrib/serve-event/serve-event.lisp @@ -1,11 +1,10 @@ ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; ;; -;; This file provides a port the SBCL/CMUCL 'serve-event' -;; functionality to ecl. serve-event provides a lispy abstraction of -;; unix select(2) non-blocking IO (and potentially other variants such as -;; epoll). It works with Unix-level file-descriptors, which can be -;; retrieved from the sockets module using the socket-file-descriptor -;; slot. +;; This file provides a port of the SBCL/CMUCL 'serve-event' extension +;; to ECL. serve-event provides a lispy abstraction of unix select(2) +;; non-blocking IO (and potentially other variants such as epoll). It +;; works with Unix-level file-descriptors, which can be retrieved from +;; the sockets module using the socket-file-descriptor slot. ;; ;; As this file is based on SBCL's serve-event module it is being ;; released under the same (non) license as SBCL (i.e. public-domain). @@ -16,29 +15,33 @@ ;; Test Example ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; ;; -;; (defun test-stdin () -;; (format t "DOING STDIN~%") -;; (with-fd-handler (0 :input #'(lambda (fd) (declare (ignore fd)) -;; (format t "Got data~%") -;; (read-char))) -;; (loop ;; FIXME: End condition -;; (format t "Entering serve-all-events...~%")(force-output) -;; (serve-all-events 5) -;; (format t "Events served~%")))) +;; (defun test-stdin (&aux exit) +;; (format t "DOING STDIN. Type Q to exit.~%") +;; (serve-event:with-fd-handler +;; (0 :input #'(lambda (fd) +;; (declare (ignore fd)) +;; (let ((ch (read-char))) +;; (format t "Got data ~s~%" ch) +;; (when (char= ch #\Q) +;; (setf exit t))))) +;; (loop until exit +;; do (format t "Entering serve-all-events...~%") +;; (force-output) +;; (serve-event:serve-all-events 5) +;; (format t "Events served~%")))) ;; ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; ;; ;; A more advanced example using sockets is available here: ;; -;; http://haltcondition.net/?p=2232 +;; https://web.archive.org/web/20161203154152/http://haltcondition.net/?p=2232 ;; ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; - -(defpackage "SERVE-EVENT" - (:use "CL" "FFI") - (:export "WITH-FD-HANDLER" "ADD-FD-HANDLER" "REMOVE-FD-HANDLER" - "SERVE-EVENT" "SERVE-ALL-EVENTS")) -(in-package "SERVE-EVENT") +(defpackage #:serve-event + (:use #:cl #:ffi) + (:export #:with-fd-handler #:add-fd-handler #:remove-fd-handler + #:serve-event #:serve-all-events)) +(in-package #:serve-event) (clines "#include " @@ -65,11 +68,12 @@ ;; FIXME: Should be based on FD_SETSIZE (descriptor 0) ;; Function to call. - (function nil :type function)) - + (function nil :type function) + ;; thread in which the handler was established + #+threads + (thread mp:*current-process*)) (defvar *descriptor-handlers* nil - #!+sb-doc "List of all the currently active handlers for file descriptors") (defun coerce-to-descriptor (stream-or-fd direction) @@ -85,6 +89,16 @@ #+clos-streams (stream (gray::stream-file-descriptor stream-or-fd direction)))) + +;;; serve-event calls only handlers which are established in the current thread +(defmacro do-handlers ((handler-symbol handler-list) &body body) + `(dolist (,handler-symbol ,handler-list) + #+threads + (when (eq mp:*current-process* (handler-thread ,handler-symbol)) + ,@body) + #-threads + ,@body)) + ;;; Add a new handler to *descriptor-handlers*. (defun add-fd-handler (stream-or-fd direction function) "Arrange to call FUNCTION whenever the fd designated by STREAM-OR-FD @@ -101,15 +115,18 @@ (let ((handler (make-handler (coerce-to-descriptor stream-or-fd direction) direction function))) - (push handler *descriptor-handlers*) + #+threads (mp:atomic-push handler *descriptor-handlers*) + #-threads (push handler *descriptor-handlers*) handler)) ;;; Remove an old handler from *descriptor-handlers*. (defun remove-fd-handler (handler) - #!+sb-doc "Removes HANDLER from the list of active handlers." - (setf *descriptor-handlers* - (delete handler *descriptor-handlers*))) + #+threads (mp:atomic-update *descriptor-handlers* + #'(lambda (all-handlers) + (remove handler all-handlers))) + #-threads (setf *descriptor-handlers* + (delete handler *descriptor-handlers*))) ;;; Add the handler to *descriptor-handlers* for the duration of BODY. (defmacro with-fd-handler ((fd direction function) &rest body) @@ -166,13 +183,13 @@ (let ((maxfd 0)) ;; Load the descriptors into the relevant set - (dolist (handler *descriptor-handlers*) + (do-handlers (handler *descriptor-handlers*) (let ((fd (handler-descriptor handler))) (ecase (handler-direction handler) (:input (fd-set fd rfd)) (:output (fd-set fd wfd))) (when (> fd maxfd) - (setf maxfd fd)))) + (setf maxfd fd)))) (multiple-value-bind (retval errno) (if (null seconds) @@ -207,7 +224,7 @@ ;; otherwise error (error "Error during select"))) ((plusp retval) - (dolist (handler *descriptor-handlers*) + (do-handlers (handler *descriptor-handlers*) (let ((fd (handler-descriptor handler))) (if (plusp (ecase (handler-direction handler) (:input (fd-isset fd rfd))