Add a job system based on multithreading

This commit is contained in:
Renaud Casenave-Péré 2014-01-05 00:06:51 +09:00
parent 981590f246
commit 56ed6cb02a
2 changed files with 138 additions and 0 deletions

136
src/jobs.lisp Normal file
View file

@ -0,0 +1,136 @@
#|
This file is a part of stoe project.
Copyright (c) 2014 Renaud Casenave-Péré (renaud@casenave-pere.fr)
|#
(in-package :cl-user)
(defpackage stoe.jobs
(:nicknames :jobs)
(:use :cl
:utils
:thread
:containers)
(:export :push-job))
(in-package :stoe.jobs)
(defstruct job
(handle -1 :read-only t)
(fun nil :read-only t)
(args nil :read-only t)
(callback nil :read-only t)
(assigned-thread -1)
(running nil)
(completed nil)
(result nil))
(defstruct command
(fun nil :read-only t)
(args nil :read-only t))
(defstruct (thread (:constructor %make-thread))
(id 0 :read-only t)
(thread nil)
(command-queue (make-safe-queue nil))
(last-updated-clock (make-clock)))
(defvar *thread-list* nil)
(defvar *job-list* (make-safe-queue nil))
(defvar *finished-job-list* (make-safe-queue nil))
(defvar *next-handle* -1)
(defun make-job-thread (fun id &optional args)
"Create a new thread."
(let* ((thread-object (%make-thread :id id))
(thread (make-thread fun :name (format nil "Thread ~a" id) :args (list thread-object))))
(setf (thread-thread thread-object) thread)
thread-object))
(defun initialize (&optional argv)
"Initialize the jobs module."
(format t "Initialize Job system~%")
(let ((thread-count (aif (member "-j" argv :test #'equal) (parse-integer (cadr it)) 1)))
(when (> thread-count 0)
(setf *thread-list*
(make-array (list thread-count) :initial-contents
(loop for i below thread-count
collect (let ((thread (make-job-thread #'thread-loop i)))
(push-command #'initialize-thread nil thread)
thread)))))))
(defun finalize ()
"Finalize the jobs module."
(format t "Finalize Job system~%")
(loop for i below (array-dimension *thread-list* 0)
do (push-command #'terminate-thread nil i))
(loop while (some (lambda (elt) (not (null elt))) *thread-list*)
do (update 0)))
(defun push-job (fun args callback)
"Create a new job using `fun' and `data' and push it into the job-list."
(let ((job (make-job :handle (incf *next-handle*) :fun fun :args args :callback callback)))
(enqueue *job-list* job)))
(defun push-command (fun args thread-or-id)
"Assign the command `fun' to the thread `thread-id'."
(let ((thread (or (and (thread-p thread-or-id) thread-or-id) (aref *thread-list* thread-or-id)))
(command (make-command :fun fun :args args)))
(when thread
(enqueue (thread-command-queue thread) command))))
(defun update (delta-time)
"Tick all running jobs to update their timer and retrieve their result value.
If a thread is available, assign a new job to it."
(declare (ignorable delta-time))
(loop for i below (array-dimension *thread-list* 0)
do (let ((thread (aref *thread-list* i)))
(when thread
(if (not (thread-alive-p (thread-thread thread)))
(finalize-thread thread)))))
(loop while (peek *finished-job-list*)
do (let ((job (dequeue *finished-job-list*)))
(when job
(if (and (not (job-running job)) (job-completed job))
(progn
(funcall (job-callback job) (job-result job))
(dequeue *job-list*)))))))
(defun initialize-thread (thread)
"Initialize a thread."
(format t "Initialize thread ~a~%" (thread-id thread)))
(defun finalize-thread (thread)
"Finalize a thread."
(format t "Finalize thread ~a~%" (thread-id thread))
(let ((result (join-thread (thread-thread thread) :default 'join-error)))
(unless (eq result 'join-error)
(setf (aref *thread-list* (thread-id thread)) nil))))
(defun terminate-thread (thread)
(declare (ignorable thread))
(throw 'exit-thread-loop nil))
(defun thread-loop (thread)
"Run the thread loop.
Poll the thread's job queue for a new job and update the thread status."
(catch 'exit-thread-loop
(loop for job = (dequeue *job-list*)
do (progn
(when job
(format t "Thread ~a: Running job ~a~%" (thread-id thread) (job-handle job))
(run-job job))
(update-thread thread)
(sleep 0.01)))))
(defun update-thread (thread)
"Update a thread status.
throw `exit-thread-loop' if the main thread has requested it to terminate."
(let ((command (dequeue (thread-command-queue thread))))
(when command
(apply (command-fun command) thread (command-args command)))))
(defun run-job (job)
(setf (job-running job) t)
(setf (job-result job) (apply (job-fun job) (job-args job)))
(setf (job-completed job) t)
(setf (job-running job) nil)
(enqueue *finished-job-list* job))

View file

@ -24,6 +24,8 @@
((:file "thread")
(:file "containers")
(:file "utils")
(:file "jobs"
:depends-on ("thread" "containers" "utils"))
(:file "stoe"
:depends-on ("utils")))))
:description "SaintOEngine - A 3d engine in common-lisp"