Skip to content

Commit

Permalink
make parallel function a generator
Browse files Browse the repository at this point in the history
  • Loading branch information
Sergey Sergeev committed Jan 25, 2024
1 parent 7abb045 commit 7fc491e
Showing 1 changed file with 14 additions and 12 deletions.
26 changes: 14 additions & 12 deletions src/workflow/StatedWorkflow.js
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ export class StatedWorkflow {
static FUNCTIONS = {
"id": StatedWorkflow.generateDateAndTimeBasedID.bind(this),
// "serial": StatedWorkflow.serial.bind(this),
"parallel": StatedWorkflow.parallel.bind(this),
// "parallel": StatedWorkflow.parallel.bind(this),
"onHttp": StatedWorkflow.onHttp.bind(this),
"subscribe": StatedWorkflow.subscribe.bind(this),
"publish": StatedWorkflow.publish.bind(this),
Expand All @@ -72,6 +72,7 @@ export class StatedWorkflow {
TemplateProcessor.DEFAULT_FUNCTIONS = {...TemplateProcessor.DEFAULT_FUNCTIONS, ...StatedWorkflow.FUNCTIONS};
const tp = new TemplateProcessor(template);
tp.functionGenerators.set("serial", StatedWorkflow.serialGenerator);
tp.functionGenerators.set("parallel", StatedWorkflow.parallelGenerator);
tp.logLevel = logLevel.ERROR; //log level must be ERROR by default. Do not commit code that sets this to DEBUG as a default
tp.onInitialize = WorkflowDispatcher.clear; //must remove all subscribers when template reinitialized
return new StatedWorkflow(tp, persistence);
Expand Down Expand Up @@ -397,15 +398,16 @@ export class StatedWorkflow {
* @param steps
* @param metaInf
*/
static validateStepPointers(resolvedJsonPointers, steps, metaInf) {
static validateStepPointers(resolvedJsonPointers, steps, metaInf, procedureName) {
if (resolvedJsonPointers.length !== steps.length) {
throw new Error(`At ${metaInf.jsonPointer__},
'$serial(...)' was passed ${steps.length} steps, but found ${resolvedJsonPointers.length} step locations in the document.`);
'${$procedureName}(...)' was passed ${steps.length} steps, but found ${resolvedJsonPointers.length} step locations in the document.`);
}
}

static async resolveEachStepToOneLocationInTemplate(metaInf, tp){
const jsonPointers = await StatedWorkflow.findSerialStepDependenciesInTemplate(metaInf);
static async resolveEachStepToOneLocationInTemplate(metaInf, tp, procedureName){
const filterExpression = `**[procedure.value='${procedureName}']`;
const jsonPointers = await StatedWorkflow.findStepDependenciesInTemplate(metaInf, filterExpression);
const resolvedPointers = StatedWorkflow.drillIntoStepArrays(jsonPointers, tp);
return resolvedPointers;
}
Expand Down Expand Up @@ -469,8 +471,8 @@ export class StatedWorkflow {
static async serialGenerator(metaInf, tp) {
return async (input, steps, context) => {

const resolvedJsonPointers = await StatedWorkflow.resolveEachStepToOneLocationInTemplate(metaInf, tp); //fixme todo we should avoid doing this for every jsonata evaluation
StatedWorkflow.validateStepPointers(resolvedJsonPointers, steps, metaInf);
const resolvedJsonPointers = await StatedWorkflow.resolveEachStepToOneLocationInTemplate(metaInf, tp, 'serial'); //fixme todo we should avoid doing this for every jsonata evaluation
StatedWorkflow.validateStepPointers(resolvedJsonPointers, steps, metaInf, 'serial');

return serial(input, steps, context, resolvedJsonPointers, tp);
}
Expand Down Expand Up @@ -503,19 +505,19 @@ export class StatedWorkflow {
let parallelDeps = {};
return async (input, steps, context) => {

const resolvedJsonPointers = await StatedWorkflow.resolveEachStepToOneLocationInTemplate(metaInf, tp); //fixme todo we should avoid doing this for every jsonata evaluation
StatedWorkflow.validateStepPointers(resolvedJsonPointers, steps, metaInf);
const resolvedJsonPointers = await StatedWorkflow.resolveEachStepToOneLocationInTemplate(metaInf, tp, 'parallel'); //fixme todo we should avoid doing this for every jsonata evaluation
StatedWorkflow.validateStepPointers(resolvedJsonPointers, steps, metaInf, 'parallel');

return serial(input, steps, context, resolvedJsonPointers, tp);
return parallel(input, steps, context, resolvedJsonPointers, tp);
}
}

// This function is called by the template processor to execute an array of steps in parallel
static async parallel(input, steps, context = {}) {
static async parallel(input, steps, context = {}, resolvedJsonPointers = {}, tp = undefined) {
let {workflowInvocation} = context;

if (workflowInvocation === undefined) {
workflowInvocation = this.generateDateAndTimeBasedID();
workflowInvocation = StatedWorkflow.generateDateAndTimeBasedID();
}

let promises = [];
Expand Down

0 comments on commit 7fc491e

Please sign in to comment.