From e8830b9ddcbae49d083a6153da3f936be472a653 Mon Sep 17 00:00:00 2001 From: Sergey Sergeev Date: Thu, 25 Jan 2024 17:15:08 -0800 Subject: [PATCH] WIP fix recover --- src/test/StatedWorkflow.test.js | 1 + src/workflow/StatedWorkflow.js | 41 ++++++++++++++------------------- 2 files changed, 18 insertions(+), 24 deletions(-) diff --git a/src/test/StatedWorkflow.test.js b/src/test/StatedWorkflow.test.js index b38446c..09b20d1 100644 --- a/src/test/StatedWorkflow.test.js +++ b/src/test/StatedWorkflow.test.js @@ -555,6 +555,7 @@ test("recover incomplete workflow - step 1 is incomplete - should rerun steps 1 // keep steps execution logs for debugging tp.options = {'keepLogs': true} await tp.initialize(); + console.log(tp.output); const {step0, step1, step2} = tp.output; expect(step0.log['1697402819332-9q6gg'].end).toBeDefined(); expect(step1.log['1697402819332-9q6gg'].start).toBeDefined(); diff --git a/src/workflow/StatedWorkflow.js b/src/workflow/StatedWorkflow.js index bcde15a..9a7bb47 100644 --- a/src/workflow/StatedWorkflow.js +++ b/src/workflow/StatedWorkflow.js @@ -73,6 +73,7 @@ export class StatedWorkflow { const tp = new TemplateProcessor(template); tp.functionGenerators.set("serial", StatedWorkflow.serialGenerator); tp.functionGenerators.set("parallel", StatedWorkflow.parallelGenerator); + tp.functionGenerators.set("recover", StatedWorkflow.recoverGenerator); tp.logLevel = logLevel.ERROR; //log level must be ERROR by default. Do not commit code that sets this to DEBUG as a default tp.onInitialize = WorkflowDispatcher.clear; //must remove all subscribers when template reinitialized return new StatedWorkflow(tp, persistence); @@ -422,24 +423,6 @@ export class StatedWorkflow { return []; } - static async findSerialStepDependenciesInTemplate(metaInf){ - return await StatedWorkflow.findStepDependenciesInTemplate(metaInf, "**[procedure.value='serial']"); - } - - static async findParallelStepDependenciesInTemplate(metaInf){ - return await StatedWorkflow.findStepDependenciesInTemplate(metaInf, "**[procedure.value='parallel']"); - } - // - // static async findSerialStepDependenciesInTemplate(metaInf){ - // const ast = metaInf.compiledExpr__.ast(); - // let depFinder = new DependencyFinder(ast); - // depFinder = await depFinder.withAstFilterExpression("**[procedure.value='serial']"); - // if(depFinder.ast){ - // return depFinder.findDependencies().map(jp.compile) - // } - // return []; - // } - /** * make sure that if the serial function has numSteps, that have located storage for each of the steps in the * document. @@ -513,7 +496,7 @@ export class StatedWorkflow { } // This function is called by the template processor to execute an array of steps in parallel - static async parallel(input, steps, context = {}, resolvedJsonPointers = {}, tp = undefined) { + static async parallel(input, stepJsons, context = {}, resolvedJsonPointers = {}, tp = undefined) { let {workflowInvocation} = context; if (workflowInvocation === undefined) { @@ -521,8 +504,8 @@ export class StatedWorkflow { } let promises = []; - for (let stepJson of steps) { - let step = new Step(stepJson, StatedWorkflow.persistence); + for (let i = 0; i < stepJsons.length; i++) { + let step = new Step(stepJsons[i], StatedWorkflow.persistence, resolvedJsonPointers?.[i], tp); const promise = StatedWorkflow.runStep(workflowInvocation, step, input) .then(result => { // step.output.results.push(result); @@ -537,7 +520,7 @@ export class StatedWorkflow { let result = await Promise.all(promises); - // if (!tp.options.keepLogs) await StatedWorkflow.deleteStepsLogs(workflowInvocation, steps); + if (!tp.options.keepLogs) await StatedWorkflow.deleteStepsLogs(workflowInvocation, steps); return result; } @@ -568,8 +551,18 @@ export class StatedWorkflow { ); } - static async recover(stepJson){ - let step = new Step(stepJson); + static async recoverGenerator(metaInf, tp) { + let parallelDeps = {}; + return async (step, context) => { + const resolvedJsonPointers = await StatedWorkflow.resolveEachStepToOneLocationInTemplate(metaInf, tp, 'recover'); //fixme todo we should avoid doing this for every jsonata evaluation + StatedWorkflow.validateStepPointers(resolvedJsonPointers, [step], metaInf, 'recover'); + return StatedWorkflow.recover(step, context, resolvedJsonPointers?.[0], tp); + } + } + + + static async recover(stepJson, context, resolvedJsonPointer, tp){ + let step = new Step(stepJson, StatedWorkflow.persistence, resolvedJsonPointer, tp); for (let workflowInvocation of step.log.getInvocations()){ await StatedWorkflow.runStep(workflowInvocation, step); }