Skip to content

Commit

Permalink
fix parallel
Browse files Browse the repository at this point in the history
  • Loading branch information
Sergey Sergeev committed Nov 17, 2023
1 parent bfe066d commit 7164548
Show file tree
Hide file tree
Showing 3 changed files with 21 additions and 47 deletions.
8 changes: 5 additions & 3 deletions example/wfDownloads.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,11 @@ start$: $subscribe(subscribeParams)
name: testConcurrentFetch
subscribeParams: #parameters for subscribing to a cloud event
to: ../${downloader$}
parallelism: 500
testData: ${[1..1000]}
source: data
parallelism: 5
testData: ${[1..10]}
source: cloudEvent
client:
type: test

downloader$: |
function($noop){
Expand Down
42 changes: 11 additions & 31 deletions src/test/StatedWorkflow.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -447,12 +447,11 @@ test("workflow perf", async () => {
expect(Object.keys(tp.output.step2.log).length).toEqual(10000);
}, 10000);

/*
test("webserver", async () => {
console.time("workflow perf total time"); // Start the timer with a label

// Load the YAML from the file
const yamlFilePath = path.join(__dirname, '../', '../', 'example', 'experimental', 'wfHttp01.yaml');
const yamlFilePath = path.join(__dirname, '../', '../', 'example', 'wfHttp01.yaml');
console.time("Read YAML file"); // Start the timer for reading the file
const templateYaml = fs.readFileSync(yamlFilePath, 'utf8');
console.timeEnd("Read YAML file"); // End the timer for reading the file
Expand All @@ -475,7 +474,7 @@ test("downloaders", async () => {
console.time("workflow perf total time"); // Start the timer with a label

// Load the YAML from the file
const yamlFilePath = path.join(__dirname, '../', '../', 'example', 'experimental', 'wfDownloads.yaml');
const yamlFilePath = path.join(__dirname, '../', '../', 'example', 'wfDownloads.yaml');
console.time("Read YAML file"); // Start the timer for reading the file
const templateYaml = fs.readFileSync(yamlFilePath, 'utf8');
console.timeEnd("Read YAML file"); // End the timer for reading the file
Expand All @@ -494,53 +493,34 @@ test("downloaders", async () => {
console.timeEnd("workflow perf total time"); // End the total time timer
}, 10000);

*/


/*
test("test all", async () => {
const tp = await StatedWorkflow.newWorkflow({
"startEven": "tada",
"startEvent": "tada",
// a,b,c,d are workflow stages, which include a callable stated expression, and an output object to
// store the results of the expression and any errors that occur
// it will allow workflow stages to be skipped if they have already been run or stop processing next
// stages if the current stage fails.
"a": {
"function": "${ function($in) { ( $console.log($in); [$in, 'a'] ~> $join('->') )} }",
"output": {
"results": [],
"errors": {}
}
"function": "${ function($in) { ( $console.log($in); [$in, 'a'] ~> $join('->') )} }"
},
"b": {
"function": "${ function($in) { [$in, 'b'] ~> $join('->') } }",
"output": {
"results": [],
"errors": {}
}
"function": "${ function($in) { [$in, 'b'] ~> $join('->') } }"
},
"c": {
"function": "${ function($in) { ( $console.log($in); [$in, 'c'] ~> $join('->') )} }",
"output": {
"results": [],
"errors": {}
}
"function": "${ function($in) { ( $console.log($in); [$in, 'c'] ~> $join('->') )} }"
},
"d": {
"function": "${ function($in) { ( $console.log($in); [$in, 'd'] ~> $join('->') )} }",
"output": {
"results": [],
"errors": {}
}
"function": "${ function($in) { ( $console.log($in); [$in, 'd'] ~> $join('->') )} }"
},
"workflow1": "${ startEven ~> $serial([a, b]) }",
"workflow2": "${ startEven ~> $parallel([c,d]) }"
"workflow1": "${ startEvent ~> $serial([a, b]) }",
"workflow2": "${ startEvent ~> $parallel([c,d]) }"
});
await tp.initialize();
expect(tp.output.workflow1)
.toEqual(['tada->a','tada->a->b']);
.toEqual('tada->a->b');
expect(tp.output.workflow2)
.toEqual(expect.arrayContaining(['tada->c', 'tada->d']));
});
*/


18 changes: 5 additions & 13 deletions src/workflow/StatedWorkflow.js
Original file line number Diff line number Diff line change
Expand Up @@ -393,24 +393,16 @@ export class StatedWorkflow {
}

// This function is called by the template processor to execute an array of steps in parallel
static async parallel(input, steps, context) {
const {name: workflowName, log, workflowInvocation} = context;

if (log === undefined) {
throw new Error('log is missing from context');
}
static async parallel(input, steps, context = {}) {
let {workflowInvocation} = context;

if (workflowInvocation === undefined) {
throw new Error('invocation id is missing from context');
workflowInvocation = this.generateDateAndTimeBasedID();
}

StatedWorkflow.initializeLog(log, workflowName, workflowInvocation);

let promises = [];
let serialOrdinal = 0;
for (let step of steps) {
const stepRecord = {invocationId: workflowInvocation, workflowName, stepName: step.name, serialOrdinal, branchType:"PARALLEL"};
const promise = StatedWorkflow.executeStep(step, input, log[workflowName][workflowInvocation], stepRecord)
for (let stepJson of steps) {
const promise = StatedWorkflow.runStep(workflowInvocation, stepJson, input)
.then(result => {
// step.output.results.push(result);
return result;
Expand Down

0 comments on commit 7164548

Please sign in to comment.