Introduce promises into the job module

This commit is contained in:
Renaud Casenave-Péré 2015-08-13 16:28:05 +02:00
parent 0c4b744e75
commit 0f0ed3a879
3 changed files with 40 additions and 17 deletions

View file

@ -4,8 +4,8 @@
|#
(uiop:define-package :stoe/core/file
(:use :cl :stoe/core/jobs)
(:export #:load-file))
(:use :cl :blackbird :stoe/core/utils :stoe/core/jobs)
(:export #:safe-read #:safe-read-from-string #:load-file))
(in-package :stoe/core/file)
(defun do-load-file (filepath type)
@ -16,8 +16,9 @@
(read-sequence buffer stream)
buffer))))
(defun load-file (filepath &key (sync nil) (type '(unsigned-byte 8)))
(defun load-file (filepath &key sync (type '(unsigned-byte 8)))
"Load the file specified by `filepath' asynchronally unless `sync' is true."
(if sync
(do-load-file filepath type)
(with-promise (resolve reject)
(resolve (do-load-file filepath type)))
(push-new-job #'do-load-file (list filepath type))))

View file

@ -1,10 +1,10 @@
#|
This file is a part of stoe project.
Copyright (c) 2014 Renaud Casenave-Péré (renaud@casenave-pere.fr)
Copyright (c) 2015 Renaud Casenave-Péré (renaud@casenave-pere.fr)
|#
(uiop:define-package :stoe/core/jobs
(:use :cl
(:use :cl :blackbird
:stoe/core/utils
:stoe/core/thread
:stoe/core/containers)
@ -24,7 +24,9 @@
(args :initarg :args :reader job-args
:documentation "The arguments given to the entry point function.")
(callback :initarg :callback :reader job-callback
:documentation "The function called upon completion of the job")))
:documentation "The function called upon completion of the job.")
(errback :initarg :errback :reader job-errback
:documentation "The function called when an error has occured regarding the job.")))
(defclass base-thread ()
((id :initarg :id :reader thread-id)
@ -50,18 +52,27 @@
(thread-id 0))
(defun make-job-id ()
(incf job-id))
(defun reset-job-ids ()
(setq job-id 0))
(defun make-thread-id ()
(incf thread-id)))
(incf thread-id))
(defun reset-thread-ids ()
(setq thread-id 0)))
(defun make-job (id fun args callback)
(make-instance 'job :id id :fun fun :args args :callback callback))
(defun thread-available-p ()
(> (length *thread-list*) 0))
(defun push-new-job (fun &optional args callback)
(let ((job (make-job (make-job-id) fun args callback)))
(with-lock-held (*job-lock*)
(enqueue *job-queue* job)
(condition-notify *job-waitqueue*))
job))
(defun make-job (id fun args callback errback)
(make-instance 'job :id id :fun fun :args args :callback callback :errback errback))
(defun push-new-job (fun &optional args)
(with-promise (resolve reject :resolve-fn resolver :reject-fn rejecter)
(let ((job (make-job (make-job-id) fun args resolver rejecter)))
(if (thread-available-p)
(with-lock-held (*job-lock*)
(enqueue *job-queue* job)
(condition-notify *job-waitqueue*))
(job-run job *current-thread-object*)))))
(defun make-base-thread (type id fun)
"Create a new thread."
@ -102,6 +113,10 @@
(defun initialize (&optional argv)
"Initialize the jobs module."
(format t "Initialize Job system~%")
(let ((main-thread (make-instance 'base-thread :id (make-thread-id))))
(with-slots (thread) main-thread
(setf thread (current-thread)))
(setq *current-thread-object* main-thread))
(let ((thread-count (get-command-line-option-number argv "-j" 0)))
(loop for i below thread-count
do (push-new-job-thread #'start-job-thread))))
@ -109,7 +124,13 @@
(defun finalize ()
"Finalize the jobs module."
(format t "Finalize Job system~%")
(update 0.0))
(update 0.0)
(assert (eq (length *thread-list*) 0))
(loop as job = (dequeue *job-queue*)
while job
do (funcall (job-errback job) 'job-canceled))
(reset-job-ids)
(reset-thread-ids))
(defun update (delta-time)
"Check finished threads and join them."

View file

@ -38,6 +38,7 @@
(funcall thunk))
:depends-on ("alexandria"
"bordeaux-threads"
"blackbird"
"cl-opengl"
"glop"
"stoe/maths/all"