From 7fc491ed77eece108734eda5825791831f25c05f Mon Sep 17 00:00:00 2001 From: Sergey Sergeev Date: Thu, 25 Jan 2024 14:44:38 -0800 Subject: [PATCH] make parallel function a generator --- src/workflow/StatedWorkflow.js | 26 ++++++++++++++------------ 1 file changed, 14 insertions(+), 12 deletions(-) diff --git a/src/workflow/StatedWorkflow.js b/src/workflow/StatedWorkflow.js index fb0a1e7..bcde15a 100644 --- a/src/workflow/StatedWorkflow.js +++ b/src/workflow/StatedWorkflow.js @@ -48,7 +48,7 @@ export class StatedWorkflow { static FUNCTIONS = { "id": StatedWorkflow.generateDateAndTimeBasedID.bind(this), // "serial": StatedWorkflow.serial.bind(this), - "parallel": StatedWorkflow.parallel.bind(this), + // "parallel": StatedWorkflow.parallel.bind(this), "onHttp": StatedWorkflow.onHttp.bind(this), "subscribe": StatedWorkflow.subscribe.bind(this), "publish": StatedWorkflow.publish.bind(this), @@ -72,6 +72,7 @@ export class StatedWorkflow { TemplateProcessor.DEFAULT_FUNCTIONS = {...TemplateProcessor.DEFAULT_FUNCTIONS, ...StatedWorkflow.FUNCTIONS}; const tp = new TemplateProcessor(template); tp.functionGenerators.set("serial", StatedWorkflow.serialGenerator); + tp.functionGenerators.set("parallel", StatedWorkflow.parallelGenerator); tp.logLevel = logLevel.ERROR; //log level must be ERROR by default. Do not commit code that sets this to DEBUG as a default tp.onInitialize = WorkflowDispatcher.clear; //must remove all subscribers when template reinitialized return new StatedWorkflow(tp, persistence); @@ -397,15 +398,16 @@ export class StatedWorkflow { * @param steps * @param metaInf */ - static validateStepPointers(resolvedJsonPointers, steps, metaInf) { + static validateStepPointers(resolvedJsonPointers, steps, metaInf, procedureName) { if (resolvedJsonPointers.length !== steps.length) { throw new Error(`At ${metaInf.jsonPointer__}, - '$serial(...)' was passed ${steps.length} steps, but found ${resolvedJsonPointers.length} step locations in the document.`); + '${$procedureName}(...)' was passed ${steps.length} steps, but found ${resolvedJsonPointers.length} step locations in the document.`); } } - static async resolveEachStepToOneLocationInTemplate(metaInf, tp){ - const jsonPointers = await StatedWorkflow.findSerialStepDependenciesInTemplate(metaInf); + static async resolveEachStepToOneLocationInTemplate(metaInf, tp, procedureName){ + const filterExpression = `**[procedure.value='${procedureName}']`; + const jsonPointers = await StatedWorkflow.findStepDependenciesInTemplate(metaInf, filterExpression); const resolvedPointers = StatedWorkflow.drillIntoStepArrays(jsonPointers, tp); return resolvedPointers; } @@ -469,8 +471,8 @@ export class StatedWorkflow { static async serialGenerator(metaInf, tp) { return async (input, steps, context) => { - const resolvedJsonPointers = await StatedWorkflow.resolveEachStepToOneLocationInTemplate(metaInf, tp); //fixme todo we should avoid doing this for every jsonata evaluation - StatedWorkflow.validateStepPointers(resolvedJsonPointers, steps, metaInf); + const resolvedJsonPointers = await StatedWorkflow.resolveEachStepToOneLocationInTemplate(metaInf, tp, 'serial'); //fixme todo we should avoid doing this for every jsonata evaluation + StatedWorkflow.validateStepPointers(resolvedJsonPointers, steps, metaInf, 'serial'); return serial(input, steps, context, resolvedJsonPointers, tp); } @@ -503,19 +505,19 @@ export class StatedWorkflow { let parallelDeps = {}; return async (input, steps, context) => { - const resolvedJsonPointers = await StatedWorkflow.resolveEachStepToOneLocationInTemplate(metaInf, tp); //fixme todo we should avoid doing this for every jsonata evaluation - StatedWorkflow.validateStepPointers(resolvedJsonPointers, steps, metaInf); + const resolvedJsonPointers = await StatedWorkflow.resolveEachStepToOneLocationInTemplate(metaInf, tp, 'parallel'); //fixme todo we should avoid doing this for every jsonata evaluation + StatedWorkflow.validateStepPointers(resolvedJsonPointers, steps, metaInf, 'parallel'); - return serial(input, steps, context, resolvedJsonPointers, tp); + return parallel(input, steps, context, resolvedJsonPointers, tp); } } // This function is called by the template processor to execute an array of steps in parallel - static async parallel(input, steps, context = {}) { + static async parallel(input, steps, context = {}, resolvedJsonPointers = {}, tp = undefined) { let {workflowInvocation} = context; if (workflowInvocation === undefined) { - workflowInvocation = this.generateDateAndTimeBasedID(); + workflowInvocation = StatedWorkflow.generateDateAndTimeBasedID(); } let promises = [];