contrib: serve-event: make serve-event multithreading save
Only call handlers established in the current thread and use atomic operations to update *descriptor-handlers*. Closes #588. Additionally: - improve the test code - add a test for the leak - provide internet machine link for the tutorial
This commit is contained in:
parent
f9db80dcbf
commit
44299c7221
3 changed files with 94 additions and 42 deletions
31
contrib/serve-event/event-test-async.lisp
Normal file
31
contrib/serve-event/event-test-async.lisp
Normal file
|
|
@ -0,0 +1,31 @@
|
||||||
|
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
|
||||||
|
;;; Test that serve-event doesn't leak its handlers to other threads
|
||||||
|
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
|
||||||
|
|
||||||
|
(require 'serve-event)
|
||||||
|
|
||||||
|
(defun test-leak (&aux exit)
|
||||||
|
(let ((out *standard-output*))
|
||||||
|
(print "Press enter." out)
|
||||||
|
(let* ((p1 (mp:process-run-function
|
||||||
|
'stdin-2
|
||||||
|
(lambda ()
|
||||||
|
(serve-event:with-fd-handler
|
||||||
|
(0 :input #'(lambda (fd)
|
||||||
|
(declare (ignore fd))
|
||||||
|
(format out "WRONG!~%")))
|
||||||
|
(sleep most-positive-fixnum)))))
|
||||||
|
(p2 (mp:process-run-function
|
||||||
|
'stdin-1
|
||||||
|
(lambda ()
|
||||||
|
(serve-event:with-fd-handler
|
||||||
|
(0 :input #'(lambda (fd)
|
||||||
|
(declare (ignore fd))
|
||||||
|
(format out"GOOD!~%")))
|
||||||
|
(unwind-protect (serve-event:serve-event)
|
||||||
|
(mp:interrupt-process p1 (lambda ()
|
||||||
|
(mp:exit-process)))))))))
|
||||||
|
(mp:process-join p1)
|
||||||
|
(mp:process-join p2))))
|
||||||
|
|
||||||
|
(test-leak)
|
||||||
|
|
@ -4,16 +4,20 @@
|
||||||
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
|
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
|
||||||
|
|
||||||
(require 'serve-event)
|
(require 'serve-event)
|
||||||
(in-package "SERVE-EVENT")
|
|
||||||
|
|
||||||
(defun test-stdin ()
|
(defun test-stdin (&aux exit)
|
||||||
(format t "DOING STDIN~%")
|
(format t "DOING STDIN. Type Q to exit.~%")
|
||||||
(with-fd-handler (0 :input #'(lambda (fd) (declare (ignore fd))
|
(serve-event:with-fd-handler
|
||||||
(format t "Got data~%")
|
(0 :input #'(lambda (fd)
|
||||||
(read-char)))
|
(declare (ignore fd))
|
||||||
(loop ;; FIXME: End condition
|
(let ((ch (read-char)))
|
||||||
(format t "Entering serve-all-events...~%")(force-output)
|
(format t "Got data ~s~%" ch)
|
||||||
(serve-all-events 5)
|
(when (char= ch #\Q)
|
||||||
(format t "Events served~%"))))
|
(setf exit t)))))
|
||||||
|
(loop until exit
|
||||||
|
do (format t "Entering serve-all-events...~%")
|
||||||
|
(force-output)
|
||||||
|
(serve-event:serve-all-events 5)
|
||||||
|
(format t "Events served~%"))))
|
||||||
|
|
||||||
(test-stdin)
|
(test-stdin)
|
||||||
|
|
|
||||||
|
|
@ -1,11 +1,10 @@
|
||||||
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
|
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
|
||||||
;;
|
;;
|
||||||
;; This file provides a port the SBCL/CMUCL 'serve-event'
|
;; This file provides a port of the SBCL/CMUCL 'serve-event' extension
|
||||||
;; functionality to ecl. serve-event provides a lispy abstraction of
|
;; to ECL. serve-event provides a lispy abstraction of unix select(2)
|
||||||
;; unix select(2) non-blocking IO (and potentially other variants such as
|
;; non-blocking IO (and potentially other variants such as epoll). It
|
||||||
;; epoll). It works with Unix-level file-descriptors, which can be
|
;; works with Unix-level file-descriptors, which can be retrieved from
|
||||||
;; retrieved from the sockets module using the socket-file-descriptor
|
;; the sockets module using the socket-file-descriptor slot.
|
||||||
;; slot.
|
|
||||||
;;
|
;;
|
||||||
;; As this file is based on SBCL's serve-event module it is being
|
;; As this file is based on SBCL's serve-event module it is being
|
||||||
;; released under the same (non) license as SBCL (i.e. public-domain).
|
;; released under the same (non) license as SBCL (i.e. public-domain).
|
||||||
|
|
@ -16,29 +15,33 @@
|
||||||
;; Test Example
|
;; Test Example
|
||||||
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
|
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
|
||||||
;;
|
;;
|
||||||
;; (defun test-stdin ()
|
;; (defun test-stdin (&aux exit)
|
||||||
;; (format t "DOING STDIN~%")
|
;; (format t "DOING STDIN. Type Q to exit.~%")
|
||||||
;; (with-fd-handler (0 :input #'(lambda (fd) (declare (ignore fd))
|
;; (serve-event:with-fd-handler
|
||||||
;; (format t "Got data~%")
|
;; (0 :input #'(lambda (fd)
|
||||||
;; (read-char)))
|
;; (declare (ignore fd))
|
||||||
;; (loop ;; FIXME: End condition
|
;; (let ((ch (read-char)))
|
||||||
;; (format t "Entering serve-all-events...~%")(force-output)
|
;; (format t "Got data ~s~%" ch)
|
||||||
;; (serve-all-events 5)
|
;; (when (char= ch #\Q)
|
||||||
;; (format t "Events served~%"))))
|
;; (setf exit t)))))
|
||||||
|
;; (loop until exit
|
||||||
|
;; do (format t "Entering serve-all-events...~%")
|
||||||
|
;; (force-output)
|
||||||
|
;; (serve-event:serve-all-events 5)
|
||||||
|
;; (format t "Events served~%"))))
|
||||||
;;
|
;;
|
||||||
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
|
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
|
||||||
;;
|
;;
|
||||||
;; A more advanced example using sockets is available here:
|
;; A more advanced example using sockets is available here:
|
||||||
;;
|
;;
|
||||||
;; http://haltcondition.net/?p=2232
|
;; https://web.archive.org/web/20161203154152/http://haltcondition.net/?p=2232
|
||||||
;;
|
;;
|
||||||
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
|
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
|
||||||
|
(defpackage #:serve-event
|
||||||
(defpackage "SERVE-EVENT"
|
(:use #:cl #:ffi)
|
||||||
(:use "CL" "FFI")
|
(:export #:with-fd-handler #:add-fd-handler #:remove-fd-handler
|
||||||
(:export "WITH-FD-HANDLER" "ADD-FD-HANDLER" "REMOVE-FD-HANDLER"
|
#:serve-event #:serve-all-events))
|
||||||
"SERVE-EVENT" "SERVE-ALL-EVENTS"))
|
(in-package #:serve-event)
|
||||||
(in-package "SERVE-EVENT")
|
|
||||||
|
|
||||||
(clines
|
(clines
|
||||||
"#include <errno.h>"
|
"#include <errno.h>"
|
||||||
|
|
@ -65,11 +68,12 @@
|
||||||
;; FIXME: Should be based on FD_SETSIZE
|
;; FIXME: Should be based on FD_SETSIZE
|
||||||
(descriptor 0)
|
(descriptor 0)
|
||||||
;; Function to call.
|
;; Function to call.
|
||||||
(function nil :type function))
|
(function nil :type function)
|
||||||
|
;; thread in which the handler was established
|
||||||
|
#+threads
|
||||||
|
(thread mp:*current-process*))
|
||||||
|
|
||||||
(defvar *descriptor-handlers* nil
|
(defvar *descriptor-handlers* nil
|
||||||
#!+sb-doc
|
|
||||||
"List of all the currently active handlers for file descriptors")
|
"List of all the currently active handlers for file descriptors")
|
||||||
|
|
||||||
(defun coerce-to-descriptor (stream-or-fd direction)
|
(defun coerce-to-descriptor (stream-or-fd direction)
|
||||||
|
|
@ -85,6 +89,16 @@
|
||||||
#+clos-streams
|
#+clos-streams
|
||||||
(stream (gray::stream-file-descriptor stream-or-fd direction))))
|
(stream (gray::stream-file-descriptor stream-or-fd direction))))
|
||||||
|
|
||||||
|
|
||||||
|
;;; serve-event calls only handlers which are established in the current thread
|
||||||
|
(defmacro do-handlers ((handler-symbol handler-list) &body body)
|
||||||
|
`(dolist (,handler-symbol ,handler-list)
|
||||||
|
#+threads
|
||||||
|
(when (eq mp:*current-process* (handler-thread ,handler-symbol))
|
||||||
|
,@body)
|
||||||
|
#-threads
|
||||||
|
,@body))
|
||||||
|
|
||||||
;;; Add a new handler to *descriptor-handlers*.
|
;;; Add a new handler to *descriptor-handlers*.
|
||||||
(defun add-fd-handler (stream-or-fd direction function)
|
(defun add-fd-handler (stream-or-fd direction function)
|
||||||
"Arrange to call FUNCTION whenever the fd designated by STREAM-OR-FD
|
"Arrange to call FUNCTION whenever the fd designated by STREAM-OR-FD
|
||||||
|
|
@ -101,15 +115,18 @@
|
||||||
(let ((handler (make-handler (coerce-to-descriptor stream-or-fd direction)
|
(let ((handler (make-handler (coerce-to-descriptor stream-or-fd direction)
|
||||||
direction
|
direction
|
||||||
function)))
|
function)))
|
||||||
(push handler *descriptor-handlers*)
|
#+threads (mp:atomic-push handler *descriptor-handlers*)
|
||||||
|
#-threads (push handler *descriptor-handlers*)
|
||||||
handler))
|
handler))
|
||||||
|
|
||||||
;;; Remove an old handler from *descriptor-handlers*.
|
;;; Remove an old handler from *descriptor-handlers*.
|
||||||
(defun remove-fd-handler (handler)
|
(defun remove-fd-handler (handler)
|
||||||
#!+sb-doc
|
|
||||||
"Removes HANDLER from the list of active handlers."
|
"Removes HANDLER from the list of active handlers."
|
||||||
(setf *descriptor-handlers*
|
#+threads (mp:atomic-update *descriptor-handlers*
|
||||||
(delete handler *descriptor-handlers*)))
|
#'(lambda (all-handlers)
|
||||||
|
(remove handler all-handlers)))
|
||||||
|
#-threads (setf *descriptor-handlers*
|
||||||
|
(delete handler *descriptor-handlers*)))
|
||||||
|
|
||||||
;;; Add the handler to *descriptor-handlers* for the duration of BODY.
|
;;; Add the handler to *descriptor-handlers* for the duration of BODY.
|
||||||
(defmacro with-fd-handler ((fd direction function) &rest body)
|
(defmacro with-fd-handler ((fd direction function) &rest body)
|
||||||
|
|
@ -166,13 +183,13 @@
|
||||||
|
|
||||||
(let ((maxfd 0))
|
(let ((maxfd 0))
|
||||||
;; Load the descriptors into the relevant set
|
;; Load the descriptors into the relevant set
|
||||||
(dolist (handler *descriptor-handlers*)
|
(do-handlers (handler *descriptor-handlers*)
|
||||||
(let ((fd (handler-descriptor handler)))
|
(let ((fd (handler-descriptor handler)))
|
||||||
(ecase (handler-direction handler)
|
(ecase (handler-direction handler)
|
||||||
(:input (fd-set fd rfd))
|
(:input (fd-set fd rfd))
|
||||||
(:output (fd-set fd wfd)))
|
(:output (fd-set fd wfd)))
|
||||||
(when (> fd maxfd)
|
(when (> fd maxfd)
|
||||||
(setf maxfd fd))))
|
(setf maxfd fd))))
|
||||||
|
|
||||||
(multiple-value-bind (retval errno)
|
(multiple-value-bind (retval errno)
|
||||||
(if (null seconds)
|
(if (null seconds)
|
||||||
|
|
@ -207,7 +224,7 @@
|
||||||
;; otherwise error
|
;; otherwise error
|
||||||
(error "Error during select")))
|
(error "Error during select")))
|
||||||
((plusp retval)
|
((plusp retval)
|
||||||
(dolist (handler *descriptor-handlers*)
|
(do-handlers (handler *descriptor-handlers*)
|
||||||
(let ((fd (handler-descriptor handler)))
|
(let ((fd (handler-descriptor handler)))
|
||||||
(if (plusp (ecase (handler-direction handler)
|
(if (plusp (ecase (handler-direction handler)
|
||||||
(:input (fd-isset fd rfd))
|
(:input (fd-isset fd rfd))
|
||||||
|
|
|
||||||
Loading…
Add table
Reference in a new issue