Fix the job module interface to make use of specialized threads

This commit is contained in:
Renaud Casenave-Péré 2015-08-21 17:14:04 +02:00
parent d6af14d552
commit e9be960167
2 changed files with 62 additions and 52 deletions

View file

@ -8,17 +8,20 @@
:stoe/core/utils
:stoe/core/thread
:stoe/core/containers)
(:export #:job #:job-id #:job-fun #:job-args #:job-callback
#:job-thread #:thread-id #:thread-terminate-p
#:task-thread #:thread-task-queue
#:push-new-job #:push-new-job-thread #:push-new-task-thread
#:terminate-thread)
(:export #:job #:job-fun #:job-args #:job-callback
#:job-thread #:thread-terminate-p
#:specialized-thread #:job-queue
#:push-new-job #:push-new-thread
#:push-new-job-thread #:push-new-specialized-thread
#:get-next-job #:job-run
#:terminate-thread
#:thread-initialize #:thread-finalize #:thread-process)
(:import-from :stoe/core/modules
#:defmodule))
(in-package :stoe/core/jobs)
(defclass job ()
((id :initarg :id :reader job-id)
((id :initarg :id :reader id)
(fun :initarg :fun :reader job-fun
:documentation "The entry point of the job.")
(args :initarg :args :reader job-args
@ -29,7 +32,8 @@
:documentation "The function called when an error has occured regarding the job.")))
(defclass base-thread ()
((id :initarg :id :reader thread-id)
((name :initarg :name :reader name)
(id :initarg :id :reader id)
(thread)
(terminatep :initform nil :accessor thread-terminate-p))
(:documentation "Base class for threads."))
@ -38,11 +42,12 @@
()
(:documentation "Threads sharing a job queue."))
(defclass task-thread (base-thread)
((task-queue :initform (make-safe-queue nil) :accessor thread-task-queue))
(:documentation "Threads with an individual task queue."))
(defclass specialized-thread (base-thread)
((job-queue :initform (make-safe-queue nil) :accessor job-queue))
(:documentation "Threads with an individual job queue."))
(defvar *thread-list* nil)
(defvar *job-thread-count* 0)
(defvar *job-queue* (make-queue))
(defvar *job-waitqueue* (make-condition-variable :name "job-waitqueue"))
(defvar *job-lock* (make-lock "job-lock"))
@ -53,14 +58,14 @@
(defun make-job-id ()
(incf job-id))
(defun reset-job-ids ()
(setq job-id 0))
(setf job-id 0))
(defun make-thread-id ()
(incf thread-id))
(defun reset-thread-ids ()
(setq thread-id 0)))
(setf thread-id 0)))
(defun thread-available-p ()
(> (length *thread-list*) 0))
(defun job-thread-available-p ()
(> *job-thread-count* 0))
(defun make-job (id fun args callback errback)
(make-instance 'job :id id :fun fun :args args :callback callback :errback errback))
@ -68,42 +73,42 @@
(defun push-new-job (fun &optional args)
(with-promise (resolve reject :resolve-fn resolver :reject-fn rejecter)
(let ((job (make-job (make-job-id) fun args resolver rejecter)))
(if (thread-available-p)
(if (job-thread-available-p)
(with-lock-held (*job-lock*)
(enqueue *job-queue* job)
(condition-notify *job-waitqueue*))
(job-run job *current-thread-object*)))))
(defun make-base-thread (type id fun)
(defun make-base-thread (type name fun)
"Create a new thread."
(let ((thread-object (make-instance type :id id)))
(let* ((id (make-thread-id))
(thread-object (make-instance type :name name :id id)))
(with-slots (thread) thread-object
(setf thread (make-thread fun :name (format nil "Thread ~a" id)
:initial-bindings (cons (cons '*current-thread-object* thread-object) *default-special-bindings*))))
(setf thread (make-thread fun :name name
:initial-bindings
(cons (cons '*current-thread-object* thread-object)
*default-special-bindings*))))
thread-object))
(defun make-job-thread (id fun)
(defun make-job-thread (name fun)
"Create a new job thread."
(make-base-thread 'job-thread id fun))
(make-base-thread 'job-thread name fun))
(defun make-task-thread (id fun)
"Create a new task thread."
(make-base-thread 'task-thread id fun))
(defun make-specialized-thread (name fun)
"Create a new specialized thread."
(make-base-thread 'specialized-thread name fun))
(defun push-new-thread (fun)
(let ((thread (make-base-thread 'base-thread (make-thread-id) fun)))
(defun push-new-thread (type name)
(let ((thread (make-base-thread type name #'start-thread)))
(push thread *thread-list*)
thread))
(defun push-new-job-thread (fun)
(let ((thread (make-job-thread (make-thread-id) fun)))
(push thread *thread-list*)
thread))
(defun push-new-job-thread (&optional name)
(push-new-thread 'job-thread name)
(incf *job-thread-count*))
(defun push-new-task-thread (fun)
(let ((thread (make-task-thread (make-thread-id) fun)))
(push thread *thread-list*)
thread))
(defun push-new-specialized-thread (&optional name)
(push-new-thread 'specialized-thread name))
(defun terminate-thread (thread)
"Terminate THREAD."
@ -113,13 +118,14 @@
(defun initialize (&optional argv)
"Initialize the jobs module."
(format t "Initialize Job system~%")
(let ((main-thread (make-instance 'base-thread :id (make-thread-id))))
(let ((main-thread (make-instance 'base-thread :name "Main Thread"
:id (make-thread-id))))
(with-slots (thread) main-thread
(setf thread (current-thread)))
(setq *current-thread-object* main-thread))
(let ((thread-count (get-command-line-option-number argv "-j" 0)))
(loop for i below thread-count
do (push-new-job-thread #'start-job-thread))))
do (push-new-job-thread))))
(defun finalize ()
"Finalize the jobs module."
@ -146,12 +152,17 @@
(defmodule stoe/core/jobs)
(defun wait-for-next-job (waitqueue job-queue lock)
(with-lock-held (lock)
(unless (peek job-queue)
(condition-wait waitqueue lock))
(when (peek job-queue)
(dequeue job-queue))))
(defgeneric get-next-job (thread))
(defmethod get-next-job ((thread base-thread)))
(defmethod get-next-job ((thread job-thread))
(with-lock-held (*job-lock*)
(unless (peek *job-queue*)
(condition-wait *job-waitqueue* *job-lock*))
(when (peek *job-queue*)
(dequeue *job-queue*))))
(defmethod get-next-job ((thread specialized-thread))
(dequeue (job-queue thread)))
(defgeneric job-run (job thread))
(defmethod job-run ((job job) thread)
@ -163,23 +174,22 @@
(defgeneric thread-initialize (thread))
(defmethod thread-initialize ((thread base-thread))
"Initialize a thread."
(format t "Initialize thread ~a~%" (thread-id thread)))
(format t "Initialize thread ~a~%" (name thread)))
(defgeneric thread-finalize (thread))
(defmethod thread-finalize ((thread base-thread))
(format t "Finalize thread ~a~%" (thread-id thread)))
(format t "Finalize thread ~a~%" (name thread)))
(defgeneric thread-process (thread))
(defmethod thread-process ((thread job-thread))
(defmethod thread-process ((thread base-thread))
(loop until (thread-terminate-p thread)
do (let ((job (wait-for-next-job *job-waitqueue* *job-queue* *job-lock*)))
do (let ((job (get-next-job thread)))
(when job
(format t "Thread ~a: Running job ~a~%"
(thread-id thread) (job-id job))
(format t "Thread ~a: Running job ~a~%" (name thread) (id job))
(restartable
(job-run job thread))))))
(defun start-job-thread ()
(defun start-thread ()
(let ((thread *current-thread-object*))
(thread-initialize thread)
(thread-process thread)

View file

@ -20,9 +20,9 @@
(defmacro with-lock-held ((place &optional (waitp t)) &body body)
(once-only (place)
`(when (acquire-lock ,place ,waitp)
(prog1
(progn
,@body)
(unwind-protect
(progn
,@body)
(release-lock ,place)))))
;;; Functions not implemented by bordeaux-threads