Skip to content

Commit

Permalink
WIP fix recover
Browse files Browse the repository at this point in the history
  • Loading branch information
Sergey Sergeev committed Jan 26, 2024
1 parent 7fc491e commit e8830b9
Show file tree
Hide file tree
Showing 2 changed files with 18 additions and 24 deletions.
1 change: 1 addition & 0 deletions src/test/StatedWorkflow.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -555,6 +555,7 @@ test("recover incomplete workflow - step 1 is incomplete - should rerun steps 1
// keep steps execution logs for debugging
tp.options = {'keepLogs': true}
await tp.initialize();
console.log(tp.output);
const {step0, step1, step2} = tp.output;
expect(step0.log['1697402819332-9q6gg'].end).toBeDefined();
expect(step1.log['1697402819332-9q6gg'].start).toBeDefined();
Expand Down
41 changes: 17 additions & 24 deletions src/workflow/StatedWorkflow.js
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ export class StatedWorkflow {
const tp = new TemplateProcessor(template);
tp.functionGenerators.set("serial", StatedWorkflow.serialGenerator);
tp.functionGenerators.set("parallel", StatedWorkflow.parallelGenerator);
tp.functionGenerators.set("recover", StatedWorkflow.recoverGenerator);
tp.logLevel = logLevel.ERROR; //log level must be ERROR by default. Do not commit code that sets this to DEBUG as a default
tp.onInitialize = WorkflowDispatcher.clear; //must remove all subscribers when template reinitialized
return new StatedWorkflow(tp, persistence);
Expand Down Expand Up @@ -422,24 +423,6 @@ export class StatedWorkflow {
return [];
}

static async findSerialStepDependenciesInTemplate(metaInf){
return await StatedWorkflow.findStepDependenciesInTemplate(metaInf, "**[procedure.value='serial']");
}

static async findParallelStepDependenciesInTemplate(metaInf){
return await StatedWorkflow.findStepDependenciesInTemplate(metaInf, "**[procedure.value='parallel']");
}
//
// static async findSerialStepDependenciesInTemplate(metaInf){
// const ast = metaInf.compiledExpr__.ast();
// let depFinder = new DependencyFinder(ast);
// depFinder = await depFinder.withAstFilterExpression("**[procedure.value='serial']");
// if(depFinder.ast){
// return depFinder.findDependencies().map(jp.compile)
// }
// return [];
// }

/**
* make sure that if the serial function has numSteps, that have located storage for each of the steps in the
* document.
Expand Down Expand Up @@ -513,16 +496,16 @@ export class StatedWorkflow {
}

// This function is called by the template processor to execute an array of steps in parallel
static async parallel(input, steps, context = {}, resolvedJsonPointers = {}, tp = undefined) {
static async parallel(input, stepJsons, context = {}, resolvedJsonPointers = {}, tp = undefined) {
let {workflowInvocation} = context;

if (workflowInvocation === undefined) {
workflowInvocation = StatedWorkflow.generateDateAndTimeBasedID();
}

let promises = [];
for (let stepJson of steps) {
let step = new Step(stepJson, StatedWorkflow.persistence);
for (let i = 0; i < stepJsons.length; i++) {
let step = new Step(stepJsons[i], StatedWorkflow.persistence, resolvedJsonPointers?.[i], tp);
const promise = StatedWorkflow.runStep(workflowInvocation, step, input)
.then(result => {
// step.output.results.push(result);
Expand All @@ -537,7 +520,7 @@ export class StatedWorkflow {

let result = await Promise.all(promises);

// if (!tp.options.keepLogs) await StatedWorkflow.deleteStepsLogs(workflowInvocation, steps);
if (!tp.options.keepLogs) await StatedWorkflow.deleteStepsLogs(workflowInvocation, steps);

return result;
}
Expand Down Expand Up @@ -568,8 +551,18 @@ export class StatedWorkflow {
);
}

static async recover(stepJson){
let step = new Step(stepJson);
static async recoverGenerator(metaInf, tp) {
let parallelDeps = {};
return async (step, context) => {
const resolvedJsonPointers = await StatedWorkflow.resolveEachStepToOneLocationInTemplate(metaInf, tp, 'recover'); //fixme todo we should avoid doing this for every jsonata evaluation
StatedWorkflow.validateStepPointers(resolvedJsonPointers, [step], metaInf, 'recover');
return StatedWorkflow.recover(step, context, resolvedJsonPointers?.[0], tp);
}
}


static async recover(stepJson, context, resolvedJsonPointer, tp){
let step = new Step(stepJson, StatedWorkflow.persistence, resolvedJsonPointer, tp);
for (let workflowInvocation of step.log.getInvocations()){
await StatedWorkflow.runStep(workflowInvocation, step);
}
Expand Down

0 comments on commit e8830b9

Please sign in to comment.