Reimplemented the job system using a waitqueue

Job threads now wait on the waitqueue for new jobs to arrive
This commit is contained in:
Renaud Casenave-Péré 2014-08-14 16:44:58 +09:00
parent 8584d5ea91
commit 8107c9b76c

View file

@ -10,18 +10,23 @@
:utils
:thread
:containers)
(:export :push-job))
(:export :job
:push-job
:wait-for-job
:cancel-job))
(in-package :stoe.jobs)
(defstruct job
(handle -1 :read-only t)
(fun nil :read-only t)
(args nil :read-only t)
(callback nil :read-only t)
(assigned-thread -1)
(running nil)
(completed nil)
(result nil))
(result nil)
(canceled nil)
(waitqueue (make-waitqueue))
(mutex (make-mutex)))
(defstruct command
(fun nil :read-only t)
@ -30,12 +35,13 @@
(defstruct (thread (:constructor %make-thread))
(id 0 :read-only t)
(thread nil)
(command-queue (make-safe-queue nil))
(last-updated-clock (make-clock)))
(termination-requested nil)
(command-queue (make-safe-queue nil)))
(defvar *thread-list* nil)
(defvar *job-list* (make-safe-queue nil))
(defvar *finished-job-list* (make-safe-queue nil))
(defvar *job-list* (make-queue))
(defvar *job-waitqueue* (make-waitqueue :name "job-waitqueue"))
(defvar *job-mutex* (make-mutex "job-mutex"))
(defvar *next-handle* -1)
(defun make-job-thread (fun id &optional args)
@ -65,21 +71,6 @@
(loop while (some (lambda (elt) (not (null elt))) *thread-list*)
do (update 0)))
(add-hook modules:*initialize-hook* #'initialize)
(add-hook modules:*finalize-hook* #'finalize)
(defun push-job (fun args callback)
"Create a new job using `fun' and `data' and push it into the job-list."
(let ((job (make-job :handle (incf *next-handle*) :fun fun :args args :callback callback)))
(enqueue *job-list* job)))
(defun push-command (fun args thread-or-id)
"Assign the command `fun' to the thread `thread-id'."
(let ((thread (or (and (thread-p thread-or-id) thread-or-id) (aref *thread-list* thread-or-id)))
(command (make-command :fun fun :args args)))
(when thread
(enqueue (thread-command-queue thread) command))))
(defun update (delta-time)
"Tick all running jobs to update their timer and retrieve their result value.
If a thread is available, assign a new job to it."
@ -88,43 +79,88 @@ If a thread is available, assign a new job to it."
do (let ((thread (aref *thread-list* i)))
(when thread
(if (not (thread-alive-p (thread-thread thread)))
(finalize-thread thread)))))
(loop while (peek *finished-job-list*)
do (let ((job (dequeue *finished-job-list*)))
(when job
(if (and (not (job-running job)) (job-completed job))
(progn
(funcall (job-callback job) (job-result job))
(dequeue *job-list*)))))))
(finalize-thread thread))))))
(add-hook modules:*initialize-hook* #'initialize)
(add-hook modules:*finalize-hook* #'finalize)
(add-hook modules:*update-hook* #'update)
(defun push-job (fun args)
"Create a new job using `fun' and `data' and push it into the job-list."
(let ((job (make-job :handle (incf *next-handle*) :fun fun :args args)))
(with-mutex (*job-mutex*)
(enqueue *job-list* job)
(condition-notify *job-waitqueue*))
job))
(defun wait-for-job (job &optional (waitp t) timeout)
"Wait for `job' to be completed. Return immediately either way if `waitp' is non-nil.
If `timeout' is specified, return even if job hasn't been completed.
Returns t if the job has completed, nil otherwise."
(or (job-completed job)
(and waitp
(with-mutex ((job-mutex job))
(if timeout
(condition-wait(job-waitqueue job) (job-mutex job) :timeout timeout)
(loop until (job-completed job)
do (condition-wait (job-waitqueue job) (job-mutex job))))
(job-completed job)))))
(defun cancel-job (job)
"Try and cancel a job request.
Return t if job has been successfully canceled, nil if it currently running."
(with-mutex (*job-mutex*)
(and (not (job-running job))
(setf (job-canceled job) t))))
(defun push-command (fun args thread-or-id)
"Assign the command `fun' to the thread `thread-id'."
(let ((thread (or (and (thread-p thread-or-id) thread-or-id) (aref *thread-list* thread-or-id))))
(when thread
(enqueue (thread-command-queue thread) (make-command :fun fun :args args))
(with-mutex (*job-mutex*)
(condition-broadcast *job-waitqueue*)))))
(defun initialize-thread (thread)
"Initialize a thread."
(format t "Initialize thread ~a~%" (thread-id thread)))
(defun finalize-thread (thread)
"Finalize a thread."
(format t "Finalize thread ~a~%" (thread-id thread))
(let ((result (join-thread (thread-thread thread) :default 'join-error)))
(unless (eq result 'join-error)
(setf (aref *thread-list* (thread-id thread)) nil))))
(let ((thread-id (thread-id thread)))
(format t "Finalize thread ~a~%" thread-id)
(join-thread (thread-thread thread) :default 'join-error)
(if (and nil (not (thread-termination-requested thread)))
;; If the thread wasn't requested to terminate, something wrong happened, restart a new one
(let ((new-thread (make-job-thread #'thread-loop thread-id)))
(push-command #'initialize-thread nil new-thread)
(setf (aref *thread-list* thread-id) new-thread))
(setf (aref *thread-list* thread-id) nil))))
(defun terminate-thread (thread)
(declare (ignorable thread))
(throw 'exit-thread-loop nil))
"Set a thread's `termination-requested' flag to t."
(setf (thread-termination-requested thread) t))
(defun wait-for-next-job (waitqueue job-list lock)
"Wait for a job to be available and return it."
(with-mutex (lock)
(let ((job nil))
(condition-wait waitqueue lock)
(when (peek job-list)
(setf job (dequeue job-list))
(setf (job-running job) t)))))
(defun thread-loop (thread)
"Run the thread loop.
Poll the thread's job queue for a new job and update the thread status."
(catch 'exit-thread-loop
(loop for job = (dequeue *job-list*)
do (progn
(when job
(format t "Thread ~a: Running job ~a~%" (thread-id thread) (job-handle job))
(run-job job))
(update-thread thread)
(sleep 0.01)))))
Wait on the job queue for a new job and update the thread status."
(loop until (thread-termination-requested thread)
do (let ((job (wait-for-next-job *job-waitqueue* *job-list* *job-mutex*)))
(restartable
(when job
(format t "Thread ~a: Running job ~a~%" (thread-id thread) (job-handle job))
(run-job job))
(update-thread thread)))))
(defun update-thread (thread)
"Update a thread status.
@ -134,8 +170,6 @@ throw `exit-thread-loop' if the main thread has requested it to terminate."
(apply (command-fun command) thread (command-args command)))))
(defun run-job (job)
(setf (job-running job) t)
(setf (job-result job) (apply (job-fun job) (job-args job)))
(setf (job-completed job) t)
(setf (job-running job) nil)
(enqueue *finished-job-list* job))
(atomic-set-flag (job-completed job) t)
(atomic-set-flag (job-running job) nil))