@ -22,8 +22,8 @@
;;; Commentary:
;;; Code:
( require 'phpinspect-worker )
( require 'phpinspect-queue )
( require 'phpinspect-util )
( define-error 'phpinspect-pipeline-incoming " Signal for incoming pipeline data " )
( define-error 'phpinspect-pipeline-error " Signal for pipeline errors " )
@ -31,13 +31,18 @@
( cl-defstruct ( phpinspect-pipeline-end ( :constructor phpinspect-make-pipeline-end ) )
( value nil
:type any )
( error nil )
( thread nil
:type thread ) )
( cl-defstruct ( phpinspect-pipeline-emission ( :constructor phpinspect-make-pipeline-emission ) )
( collection nil
:type list ) )
( cl-defstruct ( phpinspect-pipeline-thread ( :constructor phpinspect-make-pipeline-thread ) )
( in-queue nil
:type phpinspect-queue )
( ended nil
( end nil
:type boolean ) )
( cl-defstruct ( phpinspect-pipeline-ctx ( :constructor phpinspect-make-pipeline-ctx ) )
@ -54,13 +59,14 @@
( cl-defmethod phpinspect-pipeline-ctx-register-end ( ( ctx phpinspect-pipeline-ctx ) ( end phpinspect-pipeline-end ) )
( let ( ( thread ( phpinspect-pipeline-ctx-get-thread ctx ( phpinspect-pipeline-end-thread end ) ) ) )
( setf ( phpinspect-pipeline-thread-end ed thread ) t ) ) )
( setf ( phpinspect-pipeline-thread-end thread ) end ) ) )
( cl-defmethod phpinspect-pipeline-ctx-close ( ( ctx phpinspect-pipeline-ctx ) )
( let ( errors err end ed thread-live )
( let ( errors err end thread-live )
( dolist ( thread ( phpinspect-pipeline-ctx-threads ctx ) )
( setq err ( thread-last-error ( car thread ) )
ended ( phpinspect-pipeline-thread-ended ( cdr thread ) )
( setq end ( phpinspect-pipeline-thread-end ( cdr thread ) )
err ( or ( thread-last-error ( car thread ) )
( and end ( phpinspect-pipeline-end-error end ) ) )
thread-live ( thread-live-p ( car thread ) ) )
( when thread-live
@ -70,12 +76,11 @@
( setq errors ( nconc errors ( list ( format " Thread %s is still running when pipeline is closing "
( thread-name ( car thread ) ) ) ) ) ) ) )
( when ( thread-last-error ( car thread ) )
( when err
( setq errors ( nconc errors ( list ( format " Thread %s signaled error: %s "
( thread-name ( car thread ) )
( thread-last- error ( car thread ) ) ) ) ) ) )
( unless end ed
err) ) ) ) )
( unless end
( setq errors ( nconc errors ( list ( format " Thread %s never ended "
( thread-name ( car thread ) ) ) ) ) ) )
@ -89,6 +94,11 @@
( defmacro phpinspect-pipeline-emit ( data )
` ( throw 'phpinspect-pipeline-emit , data ) )
( defmacro phpinspect-pipeline-emit-all ( collection )
` ( throw 'phpinspect-pipeline-emit
( phpinspect-make-pipeline-emission
:collection , collection ) ) )
( defmacro phpinspect-pipeline-end ( &optional value )
( if value
` ( throw 'phpinspect-pipeline-emit
@ -104,19 +114,6 @@
( phpinspect-thread-pause 1 mx ( make-condition-variable mx " phpinspect-pipeline-pause " ) ) )
( thread-yield ) ) ) )
( defmacro phpinspect-pipeline-generator ( queue &rest body )
( declare ( indent 1 ) )
( let ( ( result-sym ( gensym ) )
( queue-sym ( gensym ) ) )
` ( let ( , result-sym
( , queue-sym , queue ) )
( while ( setq , result-sym ( progn ,@ body ) )
( phpinspect-queue-enqueue , queue-sym , result-sym )
( phpinspect-pipeline-pause ) )
( phpinspect-queue-enqueue , queue-sym ( phpinspect-make-pipeline-end :thread ( current-thread ) ) ) ) ) )
( defun phpinspect--chain-pipeline-steps ( steps start-queue end-queue ctx )
( let ( ( result ( gensym " result " ) )
( incoming ( gensym " incoming " ) )
@ -154,47 +151,38 @@
:documentation
" The name of this step " ) )
( defmacro phpinspect-pipeline ( &rest parameters )
( let ( key value steps body let-vars )
( catch 'break
( while parameters
( setq key ( pop parameters )
value ( pop parameters ) )
( pcase key
( :into
( let ( ( parameters )
( name )
( construct-params ) )
( if ( listp value )
( progn
( setq name ( car value )
parameters ( cdr value ) ) )
( setq name value ) )
( unless ( symbolp name )
( error " Step name should be a symbol " ) )
( let ( key value )
( while parameters
( setq key ( pop parameters )
value ( pop parameters ) )
( when ( eq :with-context key )
( setq value ` ( quote , value ) ) )
( setq key ( intern ( string-replace " :with- " " : " ( symbol-name key ) ) ) )
( setq construct-params ( nconc construct-params ( list key value ) ) ) ) )
( push ( eval ` ( phpinspect--make-pipeline-step ,@ construct-params :name ( quote , name ) ) )
steps ) ) )
( _ ( if ( keywordp key )
( error " unexpected key %s " key )
( setq body ` ( , key ) )
( throw 'break nil ) ) ) ) ) )
( when value
( setq body ( nconc body ( list value ) ) ) )
( when parameters
( setq body ( nconc body parameters ) ) )
( defmacro phpinspect--pipeline ( seed-form &rest parameters )
( let ( key value steps let-vars )
( while parameters
( setq key ( pop parameters )
value ( pop parameters ) )
( pcase key
( :into
( let ( ( parameters )
( name )
( construct-params ) )
( if ( listp value )
( progn
( setq name ( car value )
parameters ( cdr value ) ) )
( setq name value ) )
( unless ( symbolp name )
( error " Step name should be a symbol " ) )
( let ( key value )
( while parameters
( setq key ( pop parameters )
value ( pop parameters ) )
( when ( eq :with-context key )
( setq value ` ( quote , value ) ) )
( setq key ( intern ( string-replace " :with- " " : " ( symbol-name key ) ) ) )
( setq construct-params ( nconc construct-params ( list key value ) ) ) ) )
( push ( eval ` ( phpinspect--make-pipeline-step ,@ construct-params :name ( quote , name ) ) )
steps ) ) )
( _ ( error " unexpected key %s " key ) ) ) )
( setq steps ( nreverse steps ) )
@ -213,42 +201,59 @@
( result-sym ( gensym ) )
( collecting-sym ( gensym ) ) )
` ( progn
( when ( eq main-thread ( current-thread ) )
( error " Pipelines should not run in the main thread " ) )
( let* ( ,@ let-vars
( , ctx-sym ( phpinspect-make-pipeline-ctx ) )
( , queue-sym ( phpinspect-make-queue ) )
( , end-queue-sym ( phpinspect-make-queue ) )
( , collecting-sym t )
, recv-sym , result-sym )
, ( phpinspect--chain-pipeline-steps steps queue-sym end-queue-sym ctx-sym )
( phpinspect-pipeline-generator , queue-sym
,@ body )
( while , collecting-sym
( ignore-error 'phpinspect-pipeline-incoming
( progn
( phpinspect-pipeline--register-wakeup-function , end-queue-sym )
( while ( not ( phpinspect-pipeline-end-p
( setq , recv-sym ( phpinspect-pipeline-receive , end-queue-sym ) ) ) )
( setq , result-sym ( nconc , result-sym ( list , recv-sym ) ) ) )
( setq , collecting-sym nil ) ) ) )
( phpinspect-pipeline-ctx-close , ctx-sym )
, result-sym ) ) ) ) )
( defmacro phpinspect-pipeline-async ( callback &rest parameters )
( declare ( indent 1 ) )
` ( make-thread
( lambda ( )
( condition-case err
( let ( ( result ( phpinspect-pipeline ,@ parameters ) ) )
( funcall , callback result nil ) )
( t ( funcall , callback nil err ) ) ) )
" phpinspect-pipeline-async " ) )
( when ( eq main-thread ( current-thread ) )
( error " Pipelines should not run in the main thread " ) )
( let* ( ,@ let-vars
( , ctx-sym ( phpinspect-make-pipeline-ctx ) )
( , queue-sym ( phpinspect-make-queue ) )
( , end-queue-sym ( phpinspect-make-queue ) )
( , collecting-sym t )
, recv-sym , result-sym )
, ( phpinspect--chain-pipeline-steps steps queue-sym end-queue-sym ctx-sym )
( phpinspect-pipeline--enqueue
, queue-sym
( phpinspect-make-pipeline-emission :collection , seed-form ) 'no-notify )
( phpinspect-pipeline--enqueue
, queue-sym ( phpinspect-make-pipeline-end :thread ( current-thread ) ) )
( while , collecting-sym
( ignore-error 'phpinspect-pipeline-incoming
( progn
( phpinspect-pipeline--register-wakeup-function , end-queue-sym )
( while ( not ( phpinspect-pipeline-end-p
( setq , recv-sym ( phpinspect-pipeline-receive , end-queue-sym ) ) ) )
( setq , result-sym ( nconc , result-sym ( list , recv-sym ) ) ) )
( setq , collecting-sym nil ) ) ) )
( phpinspect-pipeline-ctx-close , ctx-sym )
, result-sym ) ) ) ) )
( define-inline phpinspect-pipeline ( seed-form &rest parameters )
( declare ( indent defun ) )
( let ( ( result ( gensym ) ) async macro-params )
( while parameters
( setq key ( pop parameters )
value ( pop parameters ) )
( pcase key
( :async ( setq async value ) )
( _ ( setq macro-params ( nconc macro-params ( list key value ) ) ) ) ) )
( inline-quote
( if , async
( make-thread
( lambda ( )
( condition-case err
( let ( ( , result , ( append ' ( phpinspect--pipeline ) ( list seed-form ) macro-params ) ) )
( funcall , async , result nil ) )
( t ( funcall , async nil err ) ) ) )
" phpinspect-pipeline-async " )
, ( append ' ( phpinspect--pipeline ) ( list seed-form ) macro-params ) ) ) ) )
( define-inline phpinspect-pipeline-receive ( queue )
( inline-letevals ( queue )
@ -267,6 +272,21 @@
( setf ( phpinspect-queue-subscription , queue )
( lambda ( ) ( thread-signal thread 'phpinspect-pipeline-incoming nil ) ) ) ) ) )
( define-inline phpinspect-pipeline--enqueue ( queue emission &optional no-notify )
( inline-letevals ( queue emission no-notify )
( inline-quote
( if ( and ( phpinspect-pipeline-emission-p , emission )
( phpinspect-pipeline-emission-collection , emission ) )
( progn
( while ( cdr ( phpinspect-pipeline-emission-collection , emission ) )
( phpinspect-queue-enqueue
, queue ( pop ( phpinspect-pipeline-emission-collection , emission ) )
, no-notify ) )
( phpinspect-queue-enqueue
, queue ( pop ( phpinspect-pipeline-emission-collection , emission ) ) , no-notify ) )
( phpinspect-queue-enqueue , queue , emission , no-notify ) ) ) ) )
( defmacro phpinspect-define-pipeline-step ( name function-name )
( unless ( symbolp name )
( error " name must be a symbol " ) )
@ -335,12 +355,12 @@
( progn
( setq , incoming ( phpinspect-pipeline-end-value , incoming )
, outgoing , statement )
( phpinspect- queue -enqueue , out-queue , outgoing 'no-notify ) ) )
( phpinspect- pipeline- -enqueue , out-queue , outgoing 'no-notify ) ) )
( setq , end ( phpinspect-make-pipeline-end :thread ( current-thread ) ) )
( phpinspect-pipeline-ctx-register-end , pctx-sym , end )
( setq , continue-running nil )
( phpinspect- queue -enqueue , out-queue , end ) )
( phpinspect- pipeline- -enqueue , out-queue , end ) )
;; Else
( setq , outgoing , statement )
@ -348,16 +368,16 @@
( setq , end ( phpinspect-make-pipeline-end :thread ( current-thread ) ) )
( phpinspect-pipeline-ctx-register-end , pctx-sym , end )
( setq , continue-running nil ) )
( phpinspect- queue -enqueue , out-queue , outgoing ) )
( phpinspect- pipeline- -enqueue , out-queue , outgoing ) )
( when , end
( throw 'phpinspect-pipeline-break nil ) ) ) ) )
( phpinspect-pipeline-incoming )
( t ( message " Pipeline thread errored: %s " err )
( t ( phpinspect--log " Pipeline thread errored: %s " err )
( setq , end ( phpinspect-make-pipeline-end :thread ( current-thread ) :error err ) )
( setq , continue-running nil )
( phpinspect-pipeline-ctx-register-end
, pctx-sym
( phpinspect-make-pipeline-end :thread ( current-thread ) ) ) ) ) ) ) )
( phpinspect-pipeline-ctx-register-end , pctx-sym , end )
( phpinspect-pipeline--enqueue , out-queue , end ) ) ) ) ) )
, thread-name ) ) ) ) ) ) ) ) ) ) )
( provide 'phpinspect-pipeline )