stoe/core/jobs.lisp

220 lines
7.4 KiB
Common Lisp

#|
This file is a part of stoe project.
Copyright (c) 2015 Renaud Casenave-Péré (renaud@casenave-pere.fr)
|#
(uiop:define-package :stoe/core/jobs
(:use :cl :blackbird
:stoe/core/utils
:stoe/core/thread
:stoe/core/containers)
(:export #:job #:job-fun #:job-args #:job-callback
#:job-thread #:thread-terminate-p
#:specialized-thread #:job-queue
#:async-job #:eval-on-thread #:push-new-thread
#:push-new-job-thread #:push-new-specialized-thread
#:get-next-job #:job-run
#:terminate-thread
#:thread-initialize #:thread-finalize #:thread-process)
(:import-from :stoe/core/modules
#:defmodule))
(in-package :stoe/core/jobs)
#+stoe-debug
(setf blackbird-base:*debug-on-error* t)
(defclass job ()
((id :initarg :id :reader id)
(fun :initarg :fun :reader job-fun
:documentation "The entry point of the job.")
(args :initarg :args :reader job-args
:documentation "The arguments given to the entry point function.")
(callback :initarg :callback :reader job-callback
:documentation "The function called upon completion of the job.")
(errback :initarg :errback :reader job-errback
:documentation "The function called when an error has occured regarding the job.")))
(defclass base-thread ()
((name :initarg :name :reader name)
(id :initarg :id :reader id)
(thread)
(terminatep :initform nil :accessor thread-terminate-p))
(:documentation "Base class for threads."))
(defclass job-thread (base-thread)
()
(:documentation "Threads sharing a job queue."))
(defclass specialized-thread (base-thread)
((job-queue :initform (make-safe-queue nil) :accessor job-queue))
(:documentation "Threads with an individual job queue."))
(defvar *thread-list* nil)
(defvar *job-thread-count* 0)
(defvar *job-queue* (make-queue))
(defvar *job-waitqueue* (make-condition-variable :name "job-waitqueue"))
(defvar *job-lock* (make-lock "job-lock"))
(defvar *current-thread-object* nil)
(let ((job-id 0)
(thread-id 0))
(defun make-job-id ()
(incf job-id))
(defun reset-job-ids ()
(setf job-id 0))
(defun make-thread-id ()
(incf thread-id))
(defun reset-thread-ids ()
(setf thread-id 0)))
(defun job-thread-available-p ()
(> (reduce #'+ (mapcar (lambda (x) (if (typep x 'job-thread) 1 0)) *thread-list*)) 0))
(defun make-job (id fun args callback errback)
(make-instance 'job :id id :fun fun :args args :callback callback :errback errback))
(defun push-new-job (fun &optional args)
(with-promise (resolve reject :resolve-fn resolver :reject-fn rejecter)
(let ((job (make-job (make-job-id) fun args resolver rejecter)))
(if (job-thread-available-p)
(with-lock-held (*job-lock*)
(enqueue *job-queue* job)
(condition-notify *job-waitqueue*))
(job-run job *current-thread-object*)))))
(defmacro async-job (args &body body)
(if args
`(push-new-job (lambda ,args ,@body) (list ,@args))
`(push-new-job (lambda () ,@body))))
(defun push-job-to-thread (thread fun &optional args)
(with-promise (resolve reject :resolve-fn resolver :reject-fn rejecter)
(let ((job (make-job (make-job-id) fun args resolver rejecter)))
(if thread
(enqueue (job-queue thread) job)
(error "Thread ~a is not available~%" thread)))))
(defmacro eval-on-thread (args thread &body body)
(if args
`(push-job-to-thread ,thread (lambda ,args ,@body) (list ,@args))
`(push-job-to-thread ,thread (lambda () ,@body))))
(defun make-base-thread (type name fun)
"Create a new thread."
(let* ((id (make-thread-id))
(thread-object (make-instance type :name name :id id)))
(with-slots (thread) thread-object
(setf thread (make-thread fun :name name
:initial-bindings
(cons (cons '*current-thread-object* thread-object)
*default-special-bindings*))))
thread-object))
(defun make-job-thread (name fun)
"Create a new job thread."
(make-base-thread 'job-thread name fun))
(defun make-specialized-thread (name fun)
"Create a new specialized thread."
(make-base-thread 'specialized-thread name fun))
(defun push-new-thread (type name)
(let ((thread (make-base-thread type name #'start-thread)))
(push thread *thread-list*)
thread))
(defun push-new-job-thread (&optional name)
(push-new-thread 'job-thread name)
(incf *job-thread-count*))
(defun push-new-specialized-thread (&optional name)
(push-new-thread 'specialized-thread name))
(defun terminate-thread (thread)
"Terminate THREAD."
(setf (thread-terminate-p thread) t)
(condition-notify *job-waitqueue*))
(defun initialize (&optional argv)
"Initialize the jobs module."
(format t "Initialize Job system~%")
(let ((main-thread (make-instance 'base-thread :name "Main Thread"
:id (make-thread-id))))
(with-slots (thread) main-thread
(setf thread (current-thread)))
(setq *current-thread-object* main-thread))
(let ((thread-count (get-command-line-option-number argv "-j" 0)))
(loop for i below thread-count
do (push-new-job-thread))))
(defun finalize ()
"Finalize the jobs module."
(loop-with-progress "Finalize Job system"
while (> (length *thread-list*) 0)
do (progn
(update 0.0)
(sleep 0.1)
progress-step))
(assert (eq (length *thread-list*) 0))
(loop as job = (dequeue *job-queue*)
while job
do (funcall (job-errback job) 'job-canceled))
(reset-job-ids)
(reset-thread-ids))
(defun update (delta-time)
"Check finished threads and join them."
(declare (ignore delta-time))
(setf *thread-list*
(remove-if (lambda (th)
(with-slots (thread) th
(when (and thread (not (thread-alive-p thread)))
(restartable
(join-thread thread))
t)))
*thread-list*)))
(defmodule stoe/core/jobs :jobs)
(defgeneric get-next-job (thread))
(defmethod get-next-job ((thread base-thread)))
(defmethod get-next-job ((thread job-thread))
(with-lock-held (*job-lock*)
(unless (peek *job-queue*)
(condition-wait *job-waitqueue* *job-lock*))
(when (peek *job-queue*)
(dequeue *job-queue*))))
(defmethod get-next-job ((thread specialized-thread))
(dequeue (job-queue thread)))
(defgeneric job-run (job thread))
(defmethod job-run ((job job) thread)
(with-accessors ((callback job-callback) (fun job-fun) (args job-args)) job
(let ((result (apply fun args)))
(when callback
(funcall callback result)))))
(defgeneric thread-initialize (thread))
(defmethod thread-initialize ((thread base-thread))
"Initialize a thread."
(format t "Initialize thread ~a~%" (name thread)))
(defgeneric thread-finalize (thread))
(defmethod thread-finalize ((thread base-thread))
(format t "Finalize thread ~a~%" (name thread)))
(defgeneric thread-process (thread))
(defmethod thread-process ((thread base-thread))
(loop until (thread-terminate-p thread)
do (let ((job (get-next-job thread)))
(when job
(format t "Thread ~a: Running job ~a~%" (name thread) (id job))
(restartable
(job-run job thread))))))
(defun start-thread ()
(let ((thread *current-thread-object*))
(thread-initialize thread)
(thread-process thread)
(thread-finalize thread)))