Skip to content

Commit

Permalink
Workflow persistence (#13)
Browse files Browse the repository at this point in the history
* refactoring to TemplateUtils, adding WorkflowPersistence, cleanup++

* bump stated version

* bump stated version with a bugfix

* allow functions in testData, add test for change data callback and rate limit

* add runaway example function

* revert accidental README change, convert pubsub to SW theme

* improve comments

* removeDataChangeCallback before setting a new one

* review feedback

* feedback++

* feedback+++

* feedback++++

---------

Co-authored-by: Sergey Sergeev <[email protected]>
  • Loading branch information
zhirafovod and Sergey Sergeev authored Jan 31, 2024
1 parent 888a2a7 commit 8dc30c4
Show file tree
Hide file tree
Showing 12 changed files with 266 additions and 249 deletions.
33 changes: 33 additions & 0 deletions example/pubsub-data-function.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
# Droid R2-D2 is sending messages to the Rebel Alliance's communication channel
produceParams:
type: "rebel-comm-channel"
data: |
${
function(){
{'message': 'Rebel Fleet Coordinates', 'location': $random()}
}
}
client:
type: test
# Droid C-3PO will intercept and log each received message for the Rebel Alliance
subscribeParams: #parameters for subscribing to a holocomm transmission
source: cloudEvent
type: /${ produceParams.type } # subscribe to the same channel as R2-D2 to intercept messages
to: |
/${
function($e){(
$set('/interceptedMessages/-', $e);
)}
}
subscriberId: protocolDroid
initialPosition: latest
client:
type: test
# Activates R2-D2's message transmission function every 50 milliseconds
send: "${ $setInterval( function(){ $publish(produceParams)}, 50) }"
# Activates C-3PO's message interception function
recv$: $subscribe(subscribeParams)
# interceptedMessages is where C-3PO will store the results of message decoding
interceptedMessages: [ ]
# This condition stops the operation when interceptedMessages has 10 elements
stop$: ($count(interceptedMessages)>=10?($clearInterval(send);'missionAccomplished'):'operationOngoing')
8 changes: 4 additions & 4 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@
"express": "^4.18.2",
"kafkajs": "^2.2.4",
"pulsar-client": "^1.9.0",
"stated-js": "^0.0.104"
"stated-js": "^0.0.106"
},
"devDependencies": {
"jest": "^29.7.0"
Expand Down
66 changes: 36 additions & 30 deletions src/test/StatedWorkflow.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ import path from 'path';
import {WorkflowDispatcher} from "../workflow/WorkflowDispatcher.js";
import StatedREPL from "stated-js/dist/src/StatedREPL.js";
import {EnhancedPrintFunc} from "./TestTools.js";
import {debounce} from "stated-js/dist/src/utils/debounce.js";
import {rateLimit} from "stated-js/dist/src/utils/rateLimit.js";

const __filename = fileURLToPath(import.meta.url);
const __dirname = path.dirname(__filename);
Expand Down Expand Up @@ -693,35 +693,6 @@ test("test all", async () => {
.toEqual(expect.arrayContaining(['tada->c', 'tada->d']));
});

test("persist and recover from file", async () => {
const {templateProcessor:tp} = await StatedWorkflow.newWorkflow({
"startEvent": "tada",
"a": {
"function": "${ function($in) { ( $console.log($in); [$in, 'a'] ~> $join('->') )} }"
},
"b": {
"function": "${ function($in) { [$in, 'b'] ~> $join('->') } }"
},
"workflow1": "${ function($startEvent) { $startEvent ~> $serial([a, b]) } }",
"out": "${ workflow1(startEvent)}",
});
// keep steps execution logs for debugging
tp.options = {'keepLogs': true};
await tp.initialize();


// const dataChangeCallback2 = debounce(fs.writeFileSync('.state/output.json', JSON.stringify(tp.output)), 1000);
const dataChangeCallback = debounce(async (output, theseThatChanged) => {
console.log(`dataChangeCallback invocation: ${JSON.stringify(tp.output)}`);
// await fs.writeFileSync('.state/output.json', JSON.stringify(tp.output));
}, 1000);


// tp.setDataChangeCallback('/', dataChangeCallback);
expect(tp.output.out)
.toEqual('tada->a->b');
});

test("Multiple template processors", async () => {
const t = {
"startEvent": "tada",
Expand Down Expand Up @@ -762,3 +733,38 @@ test("Multiple template processors", async () => {
.toEqual(expect.arrayContaining(['tada->c', 'tada->d']));

});

test("Template Data Change Callback with rate limit", async () => {
// Load the YAML from the file
const yamlFilePath = path.join(__dirname, '../', '../', 'example', 'pubsub-data-function.yaml');
const templateYaml = fs.readFileSync(yamlFilePath, 'utf8');
let template = yaml.load(templateYaml);
// instantiate template processor
const {templateProcessor: tp} = await StatedWorkflow.newWorkflow(template);
// keep steps execution logs for debugging
tp.options = {'keepLogs': true}

const counts = [];

const dataChangeCallback = rateLimit(async (output, theseThatChanged) => {
counts.push(output.interceptedMessages.length);
}, 1000);
tp.setDataChangeCallback('/', dataChangeCallback);

await tp.initialize();
while (tp.output.stop$ === 'still going') {
await new Promise(resolve => setTimeout(resolve, 50)); // Poll every 50ms
}
while (counts.length < 2) {
await new Promise(resolve => setTimeout(resolve, 50)); // Poll every 50ms
}

// Assertions
expect(tp.output.stop$).toEqual('missionAccomplished');
// Assert that the data change callback was called twice by rate limit function, on the first and the last events
// on the first data change callback this happens before setData (which is called after the change callback)
// on the last data change callback this happens after all setData calls succeeded (change callback is hold until
// wait time in rate limit function is over).
expect(counts).toEqual([0,10]);

});
108 changes: 32 additions & 76 deletions src/workflow/StatedWorkflow.js
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,9 @@ import winston from "winston";
import {WorkflowDispatcher} from "./WorkflowDispatcher.js";
import {StepLog} from "./StepLog.js";
import Step from "./Step.js";
import {createPersistence} from "./Persistence.js";
import DependencyFinder from "stated-js/dist/src/DependencyFinder.js";
import jp from "stated-js/dist/src/JsonPointer.js";
import {createStepPersistence} from "./StepPersistence.js";
import {TemplateUtils} from "./utils/TemplateUtils.js";
import {WorkflowPersistence} from "./WorkflowPersistence.js";

//This class is a wrapper around the TemplateProcessor class that provides workflow functionality
export class StatedWorkflow {
Expand All @@ -43,7 +43,7 @@ export class StatedWorkflow {
level: "error", //log level must be ERROR by default. Do not commit code that sets this to DEBUG as a default
});

static persistence = new createPersistence();
static persistence = createStepPersistence();

static FUNCTIONS = {
"id": StatedWorkflow.generateDateAndTimeBasedID.bind(this),
Expand All @@ -57,32 +57,46 @@ export class StatedWorkflow {
//"workflow": StatedWorkflow.workflow.bind(this)
};

constructor(templateProcessor, persistence ){
constructor(templateProcessor, stepPersistence ){
this.templateProcessor = templateProcessor;
this.persistence = persistence;
this.stepPersistence = stepPersistence;
}

// this method returns a StatedWorkflow instance with TemplateProcesor with the default functions and Stated Workflow
// functions. It also initializes persistence store, and set generator functions.
static async newWorkflow(template, persistenceType = 'noop') {
const persistence = new createPersistence({persistenceType: persistenceType});
await persistence.init();
static async newWorkflow(template, stepPersistenceType = 'noop', context = {}) {
const stepPersistence = createStepPersistence({persistenceType: stepPersistenceType});
await stepPersistence.init();
// TODO: fix CliCore.setupContext to respect context passed to the constructor
// const tp = new TemplateProcessor(template, {...TemplateProcessor.DEFAULT_FUNCTIONS, ...StatedWorkflow.FUNCTIONS});
TemplateProcessor.DEFAULT_FUNCTIONS = {...TemplateProcessor.DEFAULT_FUNCTIONS, ...StatedWorkflow.FUNCTIONS};
const tp = new TemplateProcessor(template);
const tp = new TemplateProcessor(template, context);
tp.functionGenerators.set("serial", StatedWorkflow.serialGenerator);
tp.functionGenerators.set("parallel", StatedWorkflow.parallelGenerator);
tp.functionGenerators.set("recover", StatedWorkflow.recoverGenerator);
tp.logLevel = logLevel.ERROR; //log level must be ERROR by default. Do not commit code that sets this to DEBUG as a default
tp.onInitialize = WorkflowDispatcher.clear; //must remove all subscribers when template reinitialized
return new StatedWorkflow(tp, persistence);
return new StatedWorkflow(tp, stepPersistence);
}

async initialize() {
await this.templateProcessor.initialize();
}

setWorkflowPersistence() {
const persistence = new WorkflowPersistence({workflowName: this.templateProcessor.input.name});
const cbFn = async (data, jsonPtr, removed) => {
try {
await persistence.persist(this.templateProcessor);
} catch (error) {
console.error(`Error persisting workflow state: ${error}`);
}
}
this.templateProcessor.removeDataChangeCallback('/');
this.templateProcessor.setDataChangeCallback('/',cbFn);

}

static async logFunctionInvocation(stage, args, result, error = null, log) {
const logMessage = {
context: stage.name,
Expand Down Expand Up @@ -353,7 +367,7 @@ export class StatedWorkflow {
if(testData !== undefined){
this.logger.debug(`No 'real' subscription created because testData provided for subscription params ${StatedREPL.stringify(subscriptionParams)}`);
const dispatcher = WorkflowDispatcher.getDispatcher(subscriptionParams);
dispatcher.addBatch(testData);
await dispatcher.addBatch(testData);
await dispatcher.drainBatch(); // in test mode we wanna actually wait for all the test events to process
return;
}
Expand Down Expand Up @@ -393,69 +407,11 @@ export class StatedWorkflow {

}

/**
*
* @param resolvedJsonPointers
* @param steps
* @param metaInf
*/
static validateStepPointers(resolvedJsonPointers, steps, metaInf, procedureName) {
if (resolvedJsonPointers.length !== steps.length) {
throw new Error(`At ${metaInf.jsonPointer__},
'${$procedureName}(...)' was passed ${steps.length} steps, but found ${resolvedJsonPointers.length} step locations in the document.`);
}
}

static async resolveEachStepToOneLocationInTemplate(metaInf, tp, procedureName){
const filterExpression = `**[procedure.value='${procedureName}']`;
const jsonPointers = await StatedWorkflow.findStepDependenciesInTemplate(metaInf, filterExpression);
const resolvedPointers = StatedWorkflow.drillIntoStepArrays(jsonPointers, tp);
return resolvedPointers;
}

static async findStepDependenciesInTemplate(metaInf, filterExpression){
const ast = metaInf.compiledExpr__.ast();
let depFinder = new DependencyFinder(ast);
depFinder = await depFinder.withAstFilterExpression(filterExpression);
if(depFinder.ast){
return depFinder.findDependencies().map(jp.compile)
}
return [];
}

/**
* make sure that if the serial function has numSteps, that have located storage for each of the steps in the
* document.
* @param jsonPointers
*/
static drillIntoStepArrays(jsonPointers=[], tp){
const resolved = [];
jsonPointers.forEach(p=>{
if(!jp.has(tp.output, p)){
throw new Error(`Cannot find ${p} in the template`);
}
const loc = jp.get(tp.output, p);
//if serial has a dependency on an array, for example $serial(stepsArray) or
// $serial(step1~>append(otherStepsArray)), the we drill into the array and mine out json pointers
// to each element of the array
if(Array.isArray(loc)){
for(let i=0;i<loc.length;i++){
resolved.push(p+"/"+i);
}
}else{
resolved.push(p);
}

});
return resolved;

}

static async serialGenerator(metaInf, tp) {
return async (input, steps, context) => {

const resolvedJsonPointers = await StatedWorkflow.resolveEachStepToOneLocationInTemplate(metaInf, tp, 'serial'); //fixme todo we should avoid doing this for every jsonata evaluation
StatedWorkflow.validateStepPointers(resolvedJsonPointers, steps, metaInf, 'serial');
const resolvedJsonPointers = await TemplateUtils.resolveEachStepToOneLocationInTemplate(metaInf, tp, 'serial'); //fixme todo we should avoid doing this for every jsonata evaluation
TemplateUtils.validateStepPointers(resolvedJsonPointers, steps, metaInf, 'serial');

return serial(input, steps, context, resolvedJsonPointers, tp);
}
Expand Down Expand Up @@ -488,8 +444,8 @@ export class StatedWorkflow {
let parallelDeps = {};
return async (input, steps, context) => {

const resolvedJsonPointers = await StatedWorkflow.resolveEachStepToOneLocationInTemplate(metaInf, tp, 'parallel'); //fixme todo we should avoid doing this for every jsonata evaluation
StatedWorkflow.validateStepPointers(resolvedJsonPointers, steps, metaInf, 'parallel');
const resolvedJsonPointers = await TemplateUtils.resolveEachStepToOneLocationInTemplate(metaInf, tp, 'parallel'); //fixme todo we should avoid doing this for every jsonata evaluation
TemplateUtils.validateStepPointers(resolvedJsonPointers, steps, metaInf, 'parallel');

return parallel(input, steps, context, resolvedJsonPointers, tp);
}
Expand Down Expand Up @@ -554,8 +510,8 @@ export class StatedWorkflow {
static async recoverGenerator(metaInf, tp) {
let parallelDeps = {};
return async (step, context) => {
const resolvedJsonPointers = await StatedWorkflow.resolveEachStepToOneLocationInTemplate(metaInf, tp, 'recover'); //fixme todo we should avoid doing this for every jsonata evaluation
StatedWorkflow.validateStepPointers(resolvedJsonPointers, [step], metaInf, 'recover');
const resolvedJsonPointers = await TemplateUtils.resolveEachStepToOneLocationInTemplate(metaInf, tp, 'recover'); //fixme todo we should avoid doing this for every jsonata evaluation
TemplateUtils.validateStepPointers(resolvedJsonPointers, [step], metaInf, 'recover');
return StatedWorkflow.recover(step, context, resolvedJsonPointers?.[0], tp);
}
}
Expand Down
Loading

0 comments on commit 8dc30c4

Please sign in to comment.