#| This file is a part of stoe project. Copyright (c) 2015 Renaud Casenave-Péré (renaud@casenave-pere.fr) |# (uiop:define-package :stoe/core/jobs (:use :cl :blackbird :stoe/core/utils :stoe/core/thread :stoe/core/containers) (:export #:job #:job-fun #:job-args #:job-callback #:job-thread #:thread-terminate-p #:specialized-thread #:job-queue #:async-job #:eval-on-thread #:push-new-thread #:push-new-job-thread #:push-new-specialized-thread #:get-next-job #:job-run #:terminate-thread #:thread-initialize #:thread-finalize #:thread-process) (:import-from :stoe/core/modules #:defmodule)) (in-package :stoe/core/jobs) (defclass job () ((id :initarg :id :reader id) (fun :initarg :fun :reader job-fun :documentation "The entry point of the job.") (args :initarg :args :reader job-args :documentation "The arguments given to the entry point function.") (callback :initarg :callback :reader job-callback :documentation "The function called upon completion of the job.") (errback :initarg :errback :reader job-errback :documentation "The function called when an error has occured regarding the job."))) (defclass base-thread () ((name :initarg :name :reader name) (id :initarg :id :reader id) (thread) (terminatep :initform nil :accessor thread-terminate-p)) (:documentation "Base class for threads.")) (defclass job-thread (base-thread) () (:documentation "Threads sharing a job queue.")) (defclass specialized-thread (base-thread) ((job-queue :initform (make-safe-queue nil) :accessor job-queue)) (:documentation "Threads with an individual job queue.")) (defvar *thread-list* nil) (defvar *job-thread-count* 0) (defvar *job-queue* (make-queue)) (defvar *job-waitqueue* (make-condition-variable :name "job-waitqueue")) (defvar *job-lock* (make-lock "job-lock")) (defvar *current-thread-object* nil) (let ((job-id 0) (thread-id 0)) (defun make-job-id () (incf job-id)) (defun reset-job-ids () (setf job-id 0)) (defun make-thread-id () (incf thread-id)) (defun reset-thread-ids () (setf thread-id 0))) (defun job-thread-available-p () (> *job-thread-count* 0)) (defun make-job (id fun args callback errback) (make-instance 'job :id id :fun fun :args args :callback callback :errback errback)) (defun push-new-job (fun &optional args) (with-promise (resolve reject :resolve-fn resolver :reject-fn rejecter) (let ((job (make-job (make-job-id) fun args resolver rejecter))) (if (job-thread-available-p) (with-lock-held (*job-lock*) (enqueue *job-queue* job) (condition-notify *job-waitqueue*)) (job-run job *current-thread-object*))))) (defmacro async-job (args &body body) (if args `(push-new-job (lambda ,args ,@body) (list ,@args)) `(push-new-job (lambda () ,@body)))) (defun push-job-to-thread (thread fun &optional args) (with-promise (resolve reject :resolve-fn resolver :reject-fn rejecter) (let ((job (make-job (make-job-id) fun args resolver rejecter))) (if thread (enqueue (job-queue thread) job) (error "Thread ~a is not available~%" thread))))) (defmacro eval-on-thread (args thread &body body) (if args `(push-job-to-thread ,thread (lambda ,args ,@body) (list ,@args)) `(push-job-to-thread ,thread (lambda () ,@body)))) (defun make-base-thread (type name fun) "Create a new thread." (let* ((id (make-thread-id)) (thread-object (make-instance type :name name :id id))) (with-slots (thread) thread-object (setf thread (make-thread fun :name name :initial-bindings (cons (cons '*current-thread-object* thread-object) *default-special-bindings*)))) thread-object)) (defun make-job-thread (name fun) "Create a new job thread." (make-base-thread 'job-thread name fun)) (defun make-specialized-thread (name fun) "Create a new specialized thread." (make-base-thread 'specialized-thread name fun)) (defun push-new-thread (type name) (let ((thread (make-base-thread type name #'start-thread))) (push thread *thread-list*) thread)) (defun push-new-job-thread (&optional name) (push-new-thread 'job-thread name) (incf *job-thread-count*)) (defun push-new-specialized-thread (&optional name) (push-new-thread 'specialized-thread name)) (defun terminate-thread (thread) "Terminate THREAD." (setf (thread-terminate-p thread) t) (condition-notify *job-waitqueue*)) (defun initialize (&optional argv) "Initialize the jobs module." (format t "Initialize Job system~%") (let ((main-thread (make-instance 'base-thread :name "Main Thread" :id (make-thread-id)))) (with-slots (thread) main-thread (setf thread (current-thread))) (setq *current-thread-object* main-thread)) (let ((thread-count (get-command-line-option-number argv "-j" 0))) (loop for i below thread-count do (push-new-job-thread)))) (defun finalize () "Finalize the jobs module." (format t "Finalize Job system~%") (update 0.0) (assert (eq (length *thread-list*) 0)) (loop as job = (dequeue *job-queue*) while job do (funcall (job-errback job) 'job-canceled)) (reset-job-ids) (reset-thread-ids)) (defun update (delta-time) "Check finished threads and join them." (declare (ignore delta-time)) (setf *thread-list* (remove-if (lambda (th) (with-slots (thread) th (when (and thread (not (thread-alive-p thread))) (restartable (join-thread thread)) t))) *thread-list*))) (defmodule stoe/core/jobs :jobs) (defgeneric get-next-job (thread)) (defmethod get-next-job ((thread base-thread))) (defmethod get-next-job ((thread job-thread)) (with-lock-held (*job-lock*) (unless (peek *job-queue*) (condition-wait *job-waitqueue* *job-lock*)) (when (peek *job-queue*) (dequeue *job-queue*)))) (defmethod get-next-job ((thread specialized-thread)) (dequeue (job-queue thread))) (defgeneric job-run (job thread)) (defmethod job-run ((job job) thread) (with-accessors ((callback job-callback) (fun job-fun) (args job-args)) job (let ((result (apply fun args))) (when callback (funcall callback result))))) (defgeneric thread-initialize (thread)) (defmethod thread-initialize ((thread base-thread)) "Initialize a thread." (format t "Initialize thread ~a~%" (name thread))) (defgeneric thread-finalize (thread)) (defmethod thread-finalize ((thread base-thread)) (format t "Finalize thread ~a~%" (name thread))) (defgeneric thread-process (thread)) (defmethod thread-process ((thread base-thread)) (loop until (thread-terminate-p thread) do (let ((job (get-next-job thread))) (when job (format t "Thread ~a: Running job ~a~%" (name thread) (id job)) (restartable (job-run job thread)))))) (defun start-thread () (let ((thread *current-thread-object*)) (thread-initialize thread) (thread-process thread) (thread-finalize thread)))