Rewrite job system using clos

This commit is contained in:
Renaud Casenave-Péré 2015-06-08 18:52:54 +02:00
parent 7c3a9a01c0
commit f31a433854
8 changed files with 174 additions and 253 deletions

View file

@ -4,7 +4,7 @@
|#
(uiop:define-package :stoe/core/containers
(:use :cl :stoe/core/thread)
(:use :cl :stoe/core/thread)
(:export #:queue #:stack #:make-queue #:make-stack
#:enqueue #:dequeue #:push-stack #:pop-stack #:peek
#:safe-queue #:safe-stack
@ -67,7 +67,7 @@
(first data)))
(defclass safe-container-mixin ()
((mutex :initform (make-mutex))
((lock :initform (make-lock))
(waitp :initarg :waitp :accessor safe-container-wait-p))
(:documentation "A mixin for thread-safe containers."))
@ -90,26 +90,26 @@ if `waitp', don't return until the mutex is released."
(make-instance 'safe-stack :waitp waitp))
(defmethod enqueue :around ((queue safe-queue) elt)
(with-slots (mutex waitp) queue
(with-mutex (mutex :waitp waitp)
(with-slots (lock waitp) queue
(with-lock-held (lock waitp)
(call-next-method))))
(defmethod dequeue :around ((queue safe-queue))
(with-slots (mutex waitp) queue
(with-mutex (mutex :waitp waitp)
(with-slots (lock waitp) queue
(with-lock-held (lock waitp)
(call-next-method))))
(defmethod push-stack :around ((stack safe-stack) elt)
(with-slots (mutex waitp) stack
(with-mutex (mutex :waitp waitp)
(with-slots (lock waitp) stack
(with-lock-held (lock waitp)
(call-next-method))))
(defmethod pop-stack :around ((stack safe-stack))
(with-slots (mutex waitp) stack
(with-mutex (mutex :waitp waitp)
(with-slots (lock waitp) stack
(with-lock-held (lock waitp)
(call-next-method))))
(defmethod peek :around ((container safe-container-mixin))
(with-slots (mutex waitp) container
(with-mutex (mutex :waitp waitp)
(with-slots (lock waitp) container
(with-lock-held (lock waitp)
(call-next-method))))

View file

@ -20,4 +20,4 @@
"Load the file specified by `filepath' asynchronally unless `sync' is true."
(if sync
(do-load-file filepath type)
(push-job #'do-load-file (list filepath type))))
(push-new-job #'do-load-file (list filepath type))))

View file

@ -8,168 +8,158 @@
:stoe/core/utils
:stoe/core/thread
:stoe/core/containers)
(:export #:job
#:job-result
#:push-job
#:wait-for-job
#:cancel-job)
(:export #:job #:job-id #:job-fun #:job-args #:job-callback
#:job-thread #:thread-id #:thread-terminate-p
#:task-thread #:thread-task-queue
#:push-new-job #:push-new-job-thread #:push-new-task-thread
#:terminate-thread)
(:import-from :stoe/core/modules
#:defmodule))
(in-package :stoe/core/jobs)
(defstruct job
(handle -1 :read-only t)
(fun nil :read-only t)
(args nil :read-only t)
(assigned-thread -1)
(running nil)
(completed nil)
(result nil)
(canceled nil)
(waitqueue (make-waitqueue))
(mutex (make-mutex)))
(defclass job ()
((id :initarg :id :reader job-id)
(fun :initarg :fun :reader job-fun
:documentation "The entry point of the job.")
(args :initarg :args :reader job-args
:documentation "The arguments given to the entry point function.")
(callback :initarg :callback :reader job-callback
:documentation "The function called upon completion of the job")))
(defstruct command
(fun nil :read-only t)
(args nil :read-only t))
(defclass base-thread ()
((id :initarg :id :reader thread-id)
(thread)
(terminatep :initform nil :accessor thread-terminate-p))
(:documentation "Base class for threads."))
(defstruct (thread (:constructor %make-thread))
(id 0 :read-only t)
(thread nil)
(termination-requested nil)
(command-queue (make-safe-queue nil)))
(defclass job-thread (base-thread)
()
(:documentation "Threads sharing a job queue."))
(defclass task-thread (base-thread)
((task-queue :initform (make-safe-queue nil) :accessor thread-task-queue))
(:documentation "Threads with an individual task queue."))
(defvar *thread-list* nil)
(defvar *job-list* (make-queue))
(defvar *job-waitqueue* (make-waitqueue :name "job-waitqueue"))
(defvar *job-mutex* (make-mutex "job-mutex"))
(defvar *next-handle* -1)
(defvar *job-queue* (make-queue))
(defvar *job-waitqueue* (make-condition-variable :name "job-waitqueue"))
(defvar *job-lock* (make-lock "job-lock"))
(defvar *current-thread-object* nil)
(defun make-job-thread (fun id &optional args)
(let ((job-id 0)
(thread-id 0))
(defun make-job-id ()
(incf job-id))
(defun make-thread-id ()
(incf thread-id)))
(defun make-job (id fun args callback)
(make-instance 'job :id id :fun fun :args args :callback callback))
(defun push-new-job (fun &optional args callback)
(let ((job (make-job (make-job-id) fun args callback)))
(with-lock-held (*job-lock*)
(enqueue *job-queue* job)
(condition-notify *job-waitqueue*))
job))
(defun make-base-thread (type id fun)
"Create a new thread."
(let* ((thread-object (%make-thread :id id))
(thread (make-thread fun :name (format nil "Thread ~a" id) :args (append (list thread-object) args))))
(setf (thread-thread thread-object) thread)
(let ((thread-object (make-instance type :id id)))
(with-slots (thread) thread-object
(setf thread (make-thread fun :name (format nil "Thread ~a" id)
:initial-bindings (cons (cons '*current-thread-object* thread-object) *default-special-bindings*))))
thread-object))
(defun make-job-thread (id fun)
"Create a new job thread."
(make-base-thread 'job-thread id fun))
(defun make-task-thread (id fun)
"Create a new task thread."
(make-base-thread 'task-thread id fun))
(defun push-new-thread (fun)
(let ((thread (make-base-thread 'base-thread (make-thread-id) fun)))
(push thread *thread-list*)
thread))
(defun push-new-job-thread (fun)
(let ((thread (make-job-thread (make-thread-id) fun)))
(push thread *thread-list*)
thread))
(defun push-new-task-thread (fun)
(let ((thread (make-task-thread (make-thread-id) fun)))
(push thread *thread-list*)
thread))
(defun terminate-thread (thread)
"Terminate THREAD."
(setf (thread-terminate-p thread) t)
(condition-notify *job-waitqueue*))
(defun initialize (&optional argv)
"Initialize the jobs module."
(format t "Initialize Job system~%")
(let ((thread-count (get-command-line-option-number argv "-j" 1)))
(when (> thread-count 0)
(setf *thread-list*
(make-array (list thread-count) :initial-contents
(loop for i below thread-count
collect (let ((thread (make-job-thread #'thread-loop i)))
(push-command #'initialize-thread nil thread)
thread)))))))
(let ((thread-count (get-command-line-option-number argv "-j" 0)))
(loop for i below thread-count
do (push-new-job-thread #'start-job-thread))))
(defun finalize ()
"Finalize the jobs module."
(format t "Finalize Job system~%")
(loop for i below (array-dimension *thread-list* 0)
do (push-command #'terminate-thread nil i))
(loop while (some (lambda (elt) (not (null elt))) *thread-list*)
do (update 0)))
(update 0.0))
(defun update (delta-time)
"Tick all running jobs to update their timer and retrieve their result value.
If a thread is available, assign a new job to it."
(declare (ignorable delta-time))
(loop for i below (array-dimension *thread-list* 0)
do (let ((thread (aref *thread-list* i)))
(when thread
(if (not (thread-alive-p (thread-thread thread)))
(finalize-thread thread))))))
"Check finished threads and join them."
(declare (ignore delta-time))
(setf *thread-list*
(remove-if (lambda (th)
(with-slots (thread) th
(when (and thread (not (thread-alive-p thread)))
(restartable
(join-thread thread))
t)))
*thread-list*)))
(defmodule stoe/core/jobs)
(defun push-job (fun args)
"Create a new job using `fun' and `data' and push it into the job-list."
(let ((job (make-job :handle (incf *next-handle*) :fun fun :args args)))
(with-mutex (*job-mutex*)
(enqueue *job-list* job)
(condition-notify *job-waitqueue*))
job))
(defun wait-for-next-job (waitqueue job-queue lock)
(with-lock-held (lock)
(unless (peek job-queue)
(condition-wait waitqueue lock))
(when (peek job-queue)
(dequeue job-queue))))
(defun wait-for-job (job &optional (waitp t) timeout)
"Wait for `job' to be completed. Return immediately either way if `waitp' is non-nil.
If `timeout' is specified, return even if job hasn't been completed.
(defgeneric job-run (job thread))
(defmethod job-run ((job job) thread)
(with-accessors ((callback job-callback) (fun job-fun) (args job-args)) job
(let ((result (apply fun args)))
(when callback
(funcall callback result)))))
Returns t if the job has completed, nil otherwise."
(or (job-completed job)
(and waitp
(with-mutex ((job-mutex job))
(if timeout
(condition-wait (job-waitqueue job) (job-mutex job) :timeout timeout)
(loop until (job-completed job)
do (condition-wait (job-waitqueue job) (job-mutex job))))
(job-completed job)))))
(defun cancel-job (job)
"Try and cancel a job request.
Return t if job has been successfully canceled, nil if it currently running."
(with-mutex (*job-mutex*)
(and (not (job-running job))
(setf (job-canceled job) t))))
(defun push-command (fun args thread-or-id)
"Assign the command `fun' to the thread `thread-id'."
(let ((thread (or (and (thread-p thread-or-id) thread-or-id) (aref *thread-list* thread-or-id))))
(when thread
(enqueue (thread-command-queue thread) (make-command :fun fun :args args))
(with-mutex (*job-mutex*)
(condition-broadcast *job-waitqueue*)))))
(defun initialize-thread (thread)
(defgeneric thread-initialize (thread))
(defmethod thread-initialize ((thread base-thread))
"Initialize a thread."
(format t "Initialize thread ~a~%" (thread-id thread)))
(defun finalize-thread (thread)
"Finalize a thread."
(let ((thread-id (thread-id thread)))
(format t "Finalize thread ~a~%" thread-id)
(join-thread (thread-thread thread) :default 'join-error)
(if (not (thread-termination-requested thread))
;; If the thread wasn't requested to terminate, something wrong happened, restart a new one
(let ((new-thread (make-job-thread #'thread-loop thread-id)))
(push-command #'initialize-thread nil new-thread)
(setf (aref *thread-list* thread-id) new-thread))
(setf (aref *thread-list* thread-id) nil))))
(defgeneric thread-finalize (thread))
(defmethod thread-finalize ((thread base-thread))
(format t "Finalize thread ~a~%" (thread-id thread)))
(defun terminate-thread (thread)
"Set a thread's `termination-requested' flag to t."
(setf (thread-termination-requested thread) t))
(defgeneric thread-process (thread))
(defmethod thread-process ((thread job-thread))
(loop until (thread-terminate-p thread)
do (let ((job (wait-for-next-job *job-waitqueue* *job-queue* *job-lock*)))
(when job
(format t "Thread ~a: Running job ~a~%"
(thread-id thread) (job-id job))
(restartable
(job-run job thread))))))
(defun wait-for-next-job (waitqueue job-list lock)
"Wait for a job to be available and return it."
(with-mutex (lock)
(let ((job nil))
(condition-wait waitqueue lock)
(when (peek job-list)
(setf job (dequeue job-list))
(setf (job-running job) t))
job)))
(defun thread-loop (thread)
"Run the thread loop.
Wait on the job queue for a new job and update the thread status."
(loop until (thread-termination-requested thread)
do (let ((job (wait-for-next-job *job-waitqueue* *job-list* *job-mutex*)))
(restartable
(when job
(format t "Thread ~a: Running job ~a~%" (thread-id thread) (job-handle job))
(run-job job))
(update-thread thread)))))
(defun update-thread (thread)
"Update a thread status.
throw `exit-thread-loop' if the main thread has requested it to terminate."
(let ((command (dequeue (thread-command-queue thread))))
(when command
(apply (command-fun command) thread (command-args command)))))
(defun run-job (job)
(setf (job-result job) (apply (job-fun job) (job-args job)))
(atomic-set-flag (job-completed job) t)
(atomic-set-flag (job-running job) nil))
(defun start-job-thread ()
(let ((thread *current-thread-object*))
(thread-initialize thread)
(thread-process thread)
(thread-finalize thread)))

View file

@ -4,106 +4,39 @@
|#
(uiop:define-package :stoe/core/thread
(:use :cl :stoe/core/utils)
(:export #:make-thread #:join-thread #:thread-alive-p #:current-thread #:thread-name
#:make-mutex #:grab-mutex #:release-mutex
#:with-mutex #:with-recursive-lock
#:make-waitqueue #:waitqueue-name
#:condition-wait #:condition-notify #:condition-broadcast
#:atomic-set-flag))
(:use :cl :alexandria)
(:recycle :bordeaux-threads)
(:export #:thread #:make-thread #:current-thread #:threadp #:thread-name
#:*default-special-bindings*
#:make-lock #:acquire-lock #:release-lock #:with-lock-held
#:make-recursive-lock #:acquire-recursive-lock
#:release-recursive-lock #:with-recursive-lock-held
#:make-condition-variable #:condition-wait #:condition-notify
#:with-timeout #:timeout
#:all-threads #:interrupt-thread #:destroy-thread #:thread-alive-p
#:join-thread #:thread-yield))
(in-package :stoe/core/thread)
(defun make-thread (fun &key name args)
"Create a new thread named `name' that runs `fun', with `args' passed as
arguments."
#+(and sbcl sb-thread) (sb-thread:make-thread fun :name name :arguments args)
#-(and sbcl sb-thread) (error-implementation-unsupported))
(defmacro with-lock-held ((place &optional (waitp t)) &body body)
(once-only (place)
`(when (acquire-lock ,place ,waitp)
(prog1
(progn
,@body)
(release-lock ,place)))))
(defun join-thread (thread &key (default nil defaultp) timeout)
"Suspend current thread until `thread' exits. Return the result values of the
thread function."
#+(and sbcl sb-thread)
(if defaultp
(sb-thread:join-thread thread :default default :timeout timeout)
(sb-thread:join-thread thread :timeout timeout))
#-(and sbcl sb-thread) (error-implementation-unsupported))
;;; Functions not implemented by bordeaux-threads
(defun thread-alive-p (thread)
"Return t if `thread' is alive."
#+(and sbcl sb-thread) (sb-thread:thread-alive-p thread)
#-(and sbcl sb-thread) (error-implementation-unsupported))
;; (defun condition-broadcast (queue)
;; "Notify all threads waiting on `queue'."
;; #+(and sbcl sb-thread) (sb-thread:condition-broadcast queue)
;; #-(and sbcl sb-thread) (error-implementation-unsupported))
(defun current-thread ()
"Return the current thread."
#+(and sbcl sb-thread) sb-thread:*current-thread*
#-(and sbcl sb-thread) (error-implementation-unsupported))
(defun thread-name (thread)
"Return the name of THREAD."
#+(and sbcl sb-thread) (sb-thread:thread-name thread)
#-(and sbcl sb-thread) (error-implementation-unsupported))
(defun make-mutex (&optional name)
"Create a mutex."
#+(and sbcl sb-thread) (sb-thread:make-mutex :name name)
#-(and sbcl sb-thread) (error-implementation-unsupported))
(defun grab-mutex (mutex &key (waitp t) (timeout nil))
"Acquire mutex for the current thread."
#+(and sbcl sb-thread) (sb-thread:grab-mutex mutex :waitp waitp :timeout timeout)
#-(and sbcl sb-thread) (error-implementation-unsupported))
(defun release-mutex (mutex)
"Release `mutex'."
#+(and sbcl sb-thread) (sb-thread:release-mutex mutex :if-not-owner :punt)
#-(and sbcl sb-thread) (error-implementation-unsupported))
(defmacro with-mutex ((mutex &key (waitp t) timeout) &body body)
"Acquire `mutex' for the dynamic scope of body."
#+(and sbcl sb-thread)
`(sb-thread:with-mutex (,mutex :wait-p ,waitp :timeout ,timeout)
,@body)
#-(and sbcl sb-thread)
`(error-implementation-unsupported))
(defmacro with-recursive-lock ((mutex &key (waitp t) timeout) &body body)
"Acquire `mutex' for the dynamic scope of body and allow recursive lock."
#+(and sbcl sb-thread)
`(sb-thread:with-recursive-lock (,mutex :wait-p ,waitp :timeout ,timeout)
,@body)
#-(and sbcl sb-thread)
`(error-implementation-unsupported))
(defun make-waitqueue (&key name)
"Create a waitqueue."
#+(and sbcl sb-thread) (sb-thread:make-waitqueue :name name)
#-(and sbcl sb-thread) (error-implementation-unsupported))
(defun waitqueue-name (instance)
"The name of the waitqueue."
#+(and sbcl sb-thread) (sb-thread:waitqueue-name instance)
#-(and sbcl sb-thread) (error-implementation-unsupported))
(defun condition-wait (queue mutex &key timeout)
"Start waiting on `queue' until another thread wakes us up."
#+(and sbcl sb-thread) (sb-thread:condition-wait queue mutex :timeout timeout)
#-(and sbcl sb-thread) (error-implementation-unsupported))
(defun condition-notify (queue &optional (n 1))
"Notify `n' threads waiting on `queue'."
#+(and sbcl sb-thread) (sb-thread:condition-notify queue n)
#-(and sbcl sb-thread) (error-implementation-unsupported))
(defun condition-broadcast (queue)
"Notify all threads waiting on `queue'."
#+(and sbcl sb-thread) (sb-thread:condition-broadcast queue)
#-(and sbcl sb-thread) (error-implementation-unsupported))
(defmacro atomic-set-flag (place flag)
"Set the variable pointed to by `place' to the value `flag' atomically."
#+ (and sbcl sb-thread)
`(flet ((set-flag (flag place)
(declare (ignore place))
flag))
(sb-ext:atomic-update ,place #'set-flag ,flag))
#- (and sbcl sb-thread) (error-implemntation-unsupported))
;; (defmacro atomic-set-flag (place flag)
;; "Set the variable pointed to by `place' to the value `flag' atomically."
;; #+ (and sbcl sb-thread)
;; `(flet ((set-flag (flag place)
;; (declare (ignore place))
;; flag))
;; (sb-ext:atomic-update ,place #'set-flag ,flag))
;; #- (and sbcl sb-thread) (error-implemntation-unsupported))

View file

@ -50,15 +50,12 @@
(nreverse (cons source acc))))))
(if source (rec source nil) nil)))
(defmacro restartable (unprotected &body body)
(defmacro restartable (&body body)
"Provide a Continue restart unless `unprotected' is t."
`(if ,unprotected
`(restart-case
(progn
,@body)
(restart-case
(progn
,@body)
(continue () :report "Continue"))))
(continue () :report "Continue")))
(defmacro loop-with-progress (msg &body body)
"Allow a looping process to display feedback."

View file

@ -39,4 +39,4 @@ children from being rendered."))
(loop for child in children
do (progn
(apply fun child)
(walk fun child)))))
(walk-scene fun child)))))

View file

@ -81,6 +81,6 @@ continue unless `unprotected' is t."
(initialize argv)
(unwind-protect
(progn
(game-start)
;; (game-start)
(main-loop))
(finalize)))

View file

@ -37,6 +37,7 @@
(proclaim '(optimize (debug 3) (safety 3) (speed 0)))
(funcall thunk))
:depends-on ("alexandria"
"bordeaux-threads"
"cl-opengl"
"glop"
"stoe/maths/all"