|
|
|
@ -124,6 +124,81 @@ user input.")
|
|
|
|
|
phpinspect-pipeline-pause-time mx (make-condition-variable mx "phpinspect-pipeline-pause")))
|
|
|
|
|
(thread-yield))))
|
|
|
|
|
|
|
|
|
|
(define-inline phpinspect--read-pipeline-emission (&rest body)
|
|
|
|
|
(push 'progn body)
|
|
|
|
|
(inline-quote
|
|
|
|
|
(catch 'phpinspect-pipeline-emit
|
|
|
|
|
,body
|
|
|
|
|
nil)))
|
|
|
|
|
|
|
|
|
|
(defmacro phpinspect--run-as-pipeline-step (func-name queue consumer-queue pipeline-ctx &optional local-ctx)
|
|
|
|
|
(unless (symbolp func-name)
|
|
|
|
|
(error "Function name must be a symbol, got: %s" func-name))
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
(let ((thread-name (concat "phpinspect-pipeline-" (symbol-name func-name)))
|
|
|
|
|
(statement (list func-name))
|
|
|
|
|
(incoming (gensym "incoming"))
|
|
|
|
|
(outgoing (gensym "outgoing"))
|
|
|
|
|
(inc-queue (gensym "queue"))
|
|
|
|
|
(out-queue (gensym "queue"))
|
|
|
|
|
(context-sym (gensym "context"))
|
|
|
|
|
(continue-running (gensym "continue-running"))
|
|
|
|
|
(pctx-sym (gensym "pipeline-ctx"))
|
|
|
|
|
(incoming-end (gensym "incoming-end"))
|
|
|
|
|
(end (gensym "end")))
|
|
|
|
|
|
|
|
|
|
(when local-ctx
|
|
|
|
|
(setq statement (nconc statement (list context-sym))))
|
|
|
|
|
|
|
|
|
|
(setq statement (nconc statement (list incoming)))
|
|
|
|
|
|
|
|
|
|
`(let ((,inc-queue ,queue)
|
|
|
|
|
(,out-queue ,consumer-queue)
|
|
|
|
|
(,context-sym ,local-ctx)
|
|
|
|
|
(,pctx-sym ,pipeline-ctx))
|
|
|
|
|
(make-thread
|
|
|
|
|
(lambda ()
|
|
|
|
|
(let ((,continue-running t)
|
|
|
|
|
,incoming ,outgoing ,end ,incoming-end)
|
|
|
|
|
|
|
|
|
|
(phpinspect-pipeline--register-wakeup-function ,inc-queue)
|
|
|
|
|
(while ,continue-running
|
|
|
|
|
(condition-case err
|
|
|
|
|
(progn
|
|
|
|
|
(phpinspect-pipeline-pause)
|
|
|
|
|
(setq ,incoming (phpinspect-pipeline-receive ,inc-queue))
|
|
|
|
|
|
|
|
|
|
(if (phpinspect-pipeline-end-p ,incoming)
|
|
|
|
|
(progn
|
|
|
|
|
(setq ,incoming-end ,incoming)
|
|
|
|
|
(when (phpinspect-pipeline-end-value ,incoming)
|
|
|
|
|
(progn
|
|
|
|
|
(setq ,incoming (phpinspect-pipeline-end-value ,incoming)
|
|
|
|
|
,outgoing (phpinspect--read-pipeline-emission ,statement))
|
|
|
|
|
(phpinspect-pipeline--enqueue ,out-queue ,outgoing 'no-notify)))
|
|
|
|
|
|
|
|
|
|
(setq ,end (phpinspect-make-pipeline-end :thread (current-thread)))
|
|
|
|
|
(phpinspect-pipeline-ctx-register-end ,pctx-sym ,end)
|
|
|
|
|
(setq ,continue-running nil)
|
|
|
|
|
(phpinspect-pipeline--enqueue ,out-queue ,end))
|
|
|
|
|
|
|
|
|
|
;; Else
|
|
|
|
|
(setq ,outgoing (phpinspect--read-pipeline-emission ,statement))
|
|
|
|
|
(when (phpinspect-pipeline-end-p ,outgoing)
|
|
|
|
|
(setq ,end (phpinspect-make-pipeline-end :thread (current-thread)))
|
|
|
|
|
(phpinspect-pipeline-ctx-register-end ,pctx-sym ,end)
|
|
|
|
|
(setq ,continue-running nil))
|
|
|
|
|
(phpinspect-pipeline--enqueue ,out-queue ,outgoing)))
|
|
|
|
|
(phpinspect-pipeline-incoming)
|
|
|
|
|
(t (phpinspect--log "Pipeline thread errored: %s" err)
|
|
|
|
|
(setq ,end (phpinspect-make-pipeline-end :thread (current-thread) :error err))
|
|
|
|
|
(setq ,continue-running nil)
|
|
|
|
|
(phpinspect-pipeline-ctx-register-end ,pctx-sym ,end)
|
|
|
|
|
(phpinspect-pipeline--enqueue ,out-queue ,end))))))
|
|
|
|
|
,thread-name))))
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
(defun phpinspect--chain-pipeline-steps (steps start-queue end-queue ctx)
|
|
|
|
|
(let ((result (gensym "result"))
|
|
|
|
|
(incoming (gensym "incoming"))
|
|
|
|
@ -135,9 +210,9 @@ user input.")
|
|
|
|
|
|
|
|
|
|
(setq statement
|
|
|
|
|
(if (phpinspect--pipeline-step--context-var-name step)
|
|
|
|
|
`(,(phpinspect-pipeline-step-name name "create")
|
|
|
|
|
,incoming ,outgoing ,ctx-sym ,(phpinspect--pipeline-step--context-var-name step))
|
|
|
|
|
`(,(phpinspect-pipeline-step-name name "create") ,incoming ,outgoing ,ctx-sym)))
|
|
|
|
|
`(phpinspect--run-as-pipeline-step
|
|
|
|
|
,name ,incoming ,outgoing ,ctx-sym ,(phpinspect--pipeline-step--context-var-name step))
|
|
|
|
|
`(phpinspect--run-as-pipeline-step ,name ,incoming ,outgoing ,ctx-sym)))
|
|
|
|
|
(setq body (nconc body `(,(if steps
|
|
|
|
|
`(setq ,outgoing (phpinspect-make-queue))
|
|
|
|
|
`(setq ,outgoing ,end-queue))
|
|
|
|
@ -244,9 +319,11 @@ user input.")
|
|
|
|
|
(phpinspect-pipeline-ctx-close ,ctx-sym)
|
|
|
|
|
,result-sym)))))
|
|
|
|
|
|
|
|
|
|
(define-inline phpinspect-pipeline (seed-form &rest parameters)
|
|
|
|
|
(defmacro phpinspect-pipeline (seed-form &rest parameters)
|
|
|
|
|
(declare (indent defun))
|
|
|
|
|
(let ((result (gensym)) async macro-params)
|
|
|
|
|
(let ((result (gensym))
|
|
|
|
|
(async-sym (gensym))
|
|
|
|
|
async macro-params)
|
|
|
|
|
(while parameters
|
|
|
|
|
(setq key (pop parameters)
|
|
|
|
|
value (pop parameters))
|
|
|
|
@ -255,16 +332,15 @@ user input.")
|
|
|
|
|
(:async (setq async value))
|
|
|
|
|
(_ (setq macro-params (nconc macro-params (list key value))))))
|
|
|
|
|
|
|
|
|
|
(inline-quote
|
|
|
|
|
(if ,async
|
|
|
|
|
(make-thread
|
|
|
|
|
(lambda ()
|
|
|
|
|
(condition-case err
|
|
|
|
|
(let ((,result ,(append '(phpinspect--pipeline) (list seed-form) macro-params)))
|
|
|
|
|
(funcall ,async (or ,result 'phpinspect-pipeline-nil-result) nil))
|
|
|
|
|
(t (funcall ,async nil err))))
|
|
|
|
|
"phpinspect-pipeline-async")
|
|
|
|
|
,(append '(phpinspect--pipeline) (list seed-form) macro-params)))))
|
|
|
|
|
`(if-let ((,async-sym ,async))
|
|
|
|
|
(make-thread
|
|
|
|
|
(lambda ()
|
|
|
|
|
(condition-case err
|
|
|
|
|
(let ((,result (phpinspect--pipeline ,seed-form ,@macro-params)))
|
|
|
|
|
(funcall ,async-sym (or ,result 'phpinspect-pipeline-nil-result) nil))
|
|
|
|
|
(t (funcall ,async-sym nil err))))
|
|
|
|
|
"phpinspect-pipeline-async")
|
|
|
|
|
(phpinspect--pipeline ,seed-form ,@macro-params))))
|
|
|
|
|
|
|
|
|
|
(define-inline phpinspect-pipeline-receive (queue)
|
|
|
|
|
(inline-letevals (queue)
|
|
|
|
@ -297,92 +373,5 @@ user input.")
|
|
|
|
|
,queue (pop (phpinspect-pipeline-emission-collection ,emission)) ,no-notify))
|
|
|
|
|
(phpinspect-queue-enqueue ,queue ,emission ,no-notify))))))
|
|
|
|
|
|
|
|
|
|
(defmacro phpinspect-define-pipeline-step (name function-name)
|
|
|
|
|
(unless (symbolp name)
|
|
|
|
|
(error "name must be a symbol"))
|
|
|
|
|
|
|
|
|
|
(unless (symbolp function-name)
|
|
|
|
|
(error "function-name must be a symbol"))
|
|
|
|
|
|
|
|
|
|
(let ((execute-function (phpinspect-pipeline-step-name name "execute"))
|
|
|
|
|
(constructor-function (phpinspect-pipeline-step-name name "create")))
|
|
|
|
|
|
|
|
|
|
`(progn
|
|
|
|
|
(define-inline ,execute-function (input &optional context)
|
|
|
|
|
(if context
|
|
|
|
|
(inline-quote
|
|
|
|
|
(catch 'phpinspect-pipeline-emit
|
|
|
|
|
,(append `(,function-name) '(,context) '(,input))
|
|
|
|
|
nil))
|
|
|
|
|
(inline-quote
|
|
|
|
|
(catch 'phpinspect-pipeline-emit
|
|
|
|
|
,(append `(,function-name) '(,input))
|
|
|
|
|
nil))))
|
|
|
|
|
|
|
|
|
|
(define-inline ,constructor-function (queue consumer-queue pipeline-ctx &optional context)
|
|
|
|
|
(inline-letevals (queue consumer-queue context)
|
|
|
|
|
(let ((thread-name ,(concat "phpinspect-pipeline-" (symbol-name name)))
|
|
|
|
|
(statement (list (quote ,execute-function))))
|
|
|
|
|
,@(list
|
|
|
|
|
'(let ((incoming (gensym "incoming"))
|
|
|
|
|
(outgoing (gensym "outgoing"))
|
|
|
|
|
(inc-queue (gensym "queue"))
|
|
|
|
|
(out-queue (gensym "queue"))
|
|
|
|
|
(context-sym (gensym "context"))
|
|
|
|
|
(continue-running (gensym "continue-running"))
|
|
|
|
|
(pctx-sym (gensym "pipeline-ctx"))
|
|
|
|
|
(incoming-end (gensym "incoming-end"))
|
|
|
|
|
(end (gensym "end")))
|
|
|
|
|
|
|
|
|
|
(setq statement (nconc statement (list incoming)))
|
|
|
|
|
(unless (and (inline-const-p context) (not (inline-const-val context)))
|
|
|
|
|
(setq statement (nconc statement (list context-sym))))
|
|
|
|
|
|
|
|
|
|
(inline-quote
|
|
|
|
|
(let ((,inc-queue ,queue)
|
|
|
|
|
(,out-queue ,consumer-queue)
|
|
|
|
|
(,context-sym ,context)
|
|
|
|
|
(,pctx-sym ,pipeline-ctx))
|
|
|
|
|
(make-thread
|
|
|
|
|
(lambda ()
|
|
|
|
|
(let ((,continue-running t)
|
|
|
|
|
,incoming ,outgoing ,end ,incoming-end)
|
|
|
|
|
|
|
|
|
|
(phpinspect-pipeline--register-wakeup-function ,inc-queue)
|
|
|
|
|
(while ,continue-running
|
|
|
|
|
(condition-case err
|
|
|
|
|
(progn
|
|
|
|
|
(phpinspect-pipeline-pause)
|
|
|
|
|
(setq ,incoming (phpinspect-pipeline-receive ,inc-queue))
|
|
|
|
|
|
|
|
|
|
(if (phpinspect-pipeline-end-p ,incoming)
|
|
|
|
|
(progn
|
|
|
|
|
(setq ,incoming-end ,incoming)
|
|
|
|
|
(when (phpinspect-pipeline-end-value ,incoming)
|
|
|
|
|
(progn
|
|
|
|
|
(setq ,incoming (phpinspect-pipeline-end-value ,incoming)
|
|
|
|
|
,outgoing ,statement)
|
|
|
|
|
(phpinspect-pipeline--enqueue ,out-queue ,outgoing 'no-notify)))
|
|
|
|
|
|
|
|
|
|
(setq ,end (phpinspect-make-pipeline-end :thread (current-thread)))
|
|
|
|
|
(phpinspect-pipeline-ctx-register-end ,pctx-sym ,end)
|
|
|
|
|
(setq ,continue-running nil)
|
|
|
|
|
(phpinspect-pipeline--enqueue ,out-queue ,end))
|
|
|
|
|
|
|
|
|
|
;; Else
|
|
|
|
|
(setq ,outgoing ,statement)
|
|
|
|
|
(when (phpinspect-pipeline-end-p ,outgoing)
|
|
|
|
|
(setq ,end (phpinspect-make-pipeline-end :thread (current-thread)))
|
|
|
|
|
(phpinspect-pipeline-ctx-register-end ,pctx-sym ,end)
|
|
|
|
|
(setq ,continue-running nil))
|
|
|
|
|
(phpinspect-pipeline--enqueue ,out-queue ,outgoing)))
|
|
|
|
|
(phpinspect-pipeline-incoming)
|
|
|
|
|
(t (phpinspect--log "Pipeline thread errored: %s" err)
|
|
|
|
|
(setq ,end (phpinspect-make-pipeline-end :thread (current-thread) :error err))
|
|
|
|
|
(setq ,continue-running nil)
|
|
|
|
|
(phpinspect-pipeline-ctx-register-end ,pctx-sym ,end)
|
|
|
|
|
(phpinspect-pipeline--enqueue ,out-queue ,end))))))
|
|
|
|
|
,thread-name)))))))))))
|
|
|
|
|
|
|
|
|
|
(provide 'phpinspect-pipeline)
|
|
|
|
|
;;; phpinspect-pipeline.el ends here
|
|
|
|
|