Skip to content

Commit

Permalink
Migrate to TemplateProcessor.setData, fix tests, draft persistence ty…
Browse files Browse the repository at this point in the history
…pes. (#8)

* Persistenance noop in-memory

* WIP seprate Persistence class

* add Persistence interface and draft different implementations

* cleanup Persistence, add jsonPath to store argument

* propogate jsonPath

* add invocation path to log

* added state files

* update to the latest version

* switch to memory persistence, expose persistenceType as a constructor parameter

* revert tests to noop persistence behavior

* uncomment perf test

* docker build

* draft support for TemplateProcessor for using setData

* make setData working for serialGenerator

* pass newly created templateProcessor to the repl

* update version, WIP setData

* convert workflow to a function to avoid reevaluation on every step log change

* migrate to statuc helper functions to resolve json paths

* commented out perf and correlate failing tests

* uncomment perf test with doubled perf time to ensure no flaky tests on intel chip

* fix correlate test

* add a server to start the workflow

* API to list all workflows

* add verbose logging

* fix retryCount

* README++, subscribe/produce

* README++

* locks++

* fix pubsub test

* add table of content

---------

Co-authored-by: Sergey Sergeev <[email protected]>
  • Loading branch information
zhirafovod and Sergey Sergeev authored Jan 19, 2024
1 parent 0e26b72 commit ecb530b
Show file tree
Hide file tree
Showing 22 changed files with 2,354 additions and 1,220 deletions.
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -139,3 +139,6 @@ dist

# IDEs
.vscode

# Internal state storage
.state
19 changes: 19 additions & 0 deletions Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
# Official Node.js runtime
FROM node:20.5.1

# Set the working directory inside the container
WORKDIR /usr/src/app

# Bundle the application source code inside the container
COPY src ./src
COPY package.json package-lock.json stated-workflow.js stated-workflow-docker.js README.md ./
COPY example ./example

# Install application dependencies
RUN npm install

# Grant execute permissions for the stated-workflow-docker.js file
RUN chmod +x stated-workflow-docker.js

# Start an example workflow which listens on port 8080 for cloud events
CMD ["node", "--experimental-vm-modules", "./stated-workflow-docker.js", "-f", "example/wfHttp01.yaml"]
199 changes: 161 additions & 38 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,16 +1,32 @@
![Stated-Workflows](https://raw.githubusercontent.com/geoffhendrey/jsonataplay/main/stated-workflows.svg)

<!-- TOC -->
* [Overview](#overview)
* [Getting Started](#getting-started)
* [Installation](#installation)
* [Running the REPL](#running-the-repl)
* [Stated Template Jobs](#stated-template-jobs-)
* [Job Concurrency](#job-concurrency)
* [Internal Job Concurrency](#internal-job-concurrency)
* [Stated Workflow Functions](#stated-workflow-functions)
* [Cloud Events](#cloud-events)
* [Durability](#durability)
* [Workflow Steps](#workflow-steps)
* [Error Handling](#error-handling)
* [retries](#retries)
<!-- TOC -->

# Overview
Stated Workflows is a collection of functions for a lightweight and scalable event-driven workflow engine using
Stated Workflows is a collection of functions for a lightweight and scalable event-driven workflow engine using
[Stated](https://github.com/cisco-open/stated) template engine.
This README assumes some familiarity with the [Stated REPL](https://github.com/cisco-open/stated#running-the-repl).
If you don't have Stated, [get it now](https://github.com/cisco-open/stated#getting-started). The key benefits of
If you don't have Stated, [get it now](https://github.com/cisco-open/stated#getting-started). The key benefits of
Stated Worklflows are:
* __Easy__ - Stated Workflows are easier to express and comprehend than other workflow languages
* __Testable__ - Stated Workflows are testable. Every Stated Workflow can be tested from a REPL and behaves exactly locally as it does in Stated Workflow cluster.
* __Interactive__ - As you can see from the exmaples in this README, you can interact directly with workflows from the REPL
* __Transparent__ - Stated Workflows takes a "What You See Is What You Get" approach to workflows. Stated-Workflows is the only workflow engine with a JSON-centric approach to data and durability.
* __Highly Available__ - Stated Workflows can be run in an easy-to-scale, k8s-friendly cluster for scaling, durability, and high availability
* __Easy__ - Stated Workflows are easier to express and comprehend than other workflow languages
* __Testable__ - Stated Workflows are testable. Every Stated Workflow can be tested from a REPL and behaves exactly locally as it does in Stated Workflow cluster.
* __Interactive__ - As you can see from the exmaples in this README, you can interact directly with workflows from the REPL
* __Transparent__ - Stated Workflows takes a "What You See Is What You Get" approach to workflows. Stated-Workflows is the only workflow engine with a JSON-centric approach to data and durability.
* __Highly Available__ - Stated Workflows can be run in an easy-to-scale, k8s-friendly cluster for scaling, durability, and high availability

## Getting Started

Expand Down Expand Up @@ -43,15 +59,15 @@ You can start the REPL by running the `stateflow` command in your terminal:
stateflow
```
The REPL will launch, allowing you to interact with the stated-js library. In order to launch properly you need to have
`node` on your path.`stateflow` is a wrapper script that simply calls `stated-workflow.js`, which contains this
`#!/usr/bin/env node --experimental-vm-modules`.
`node` on your path.`stateflow` is a wrapper script that simply calls `stated-workflow.js`, which contains this
`#!/usr/bin/env node --experimental-vm-modules`.

For example you can enter this command in the REPL:
```bash
> .init -f "example/homeworld.json"
```
# Jobs
A job is a Stated Workflow template that runs to completion and does not receive any asynchronous inputs.
# Stated Template Jobs
A job is a pure Stated Template that runs to completion and does not receive any asynchronous inputs.
A job has a beginning, and an end. Here is a job that uses the Starwars API to search for Luke Skywalker's details,
extract the homeworld URL, retrieve the homeworld details, and extract the homeworld's name.
```json
Expand All @@ -64,7 +80,7 @@ extract the homeworld URL, retrieve the homeworld details, and extract the homew
```
![homeworld workflow](https://raw.githubusercontent.com/geoffhendrey/jsonataplay/main/homeworld-workflow.svg)

Try it, from the [Stated REPL](https://github.com/cisco-open/stated#running-the-repl). The `.init` command loads the
Try it, from the [Stated REPL](https://github.com/cisco-open/stated#running-the-repl). The `.init` command loads the
example

```json
Expand Down Expand Up @@ -203,7 +219,7 @@ shows us that for an origin of `/homeworldDetails`, the DAG flows to `/homeworld


Let's compare this to it's equivalent in CNCF Serverless Workflows. As you can see, with no expression analyzer and
no internal DAG builder, the developer of a CNCF workflow must specify the states, and the transition between states.
no internal DAG builder, the developer of a CNCF workflow must specify the states, and the transition between states.
<details>
<summary>CNCF Serverless Workflow (click to expand...a lot)</summary>
<pre>
Expand Down Expand Up @@ -269,14 +285,8 @@ no internal DAG builder, the developer of a CNCF workflow must specify the state

</details>

Stated alone is a powerful and concise workflow engine. So why do
we need Stated-Workflows and what is it? Stated-Workflows is a set
of functions that provide integration with cloud events, and high
availability to workflows, when they are executed in the Stated-Workflows
clustered runtime.

# Job Concurrency
Job's can be run concurrently because each job's state is totally encapsulated
### Job Concurrency
Job's can be run concurrently because each job's state is totally encapsulated
in its template variables. The following JS code shows how to launch 10 jobs
in parallel.

Expand Down Expand Up @@ -316,10 +326,10 @@ runParallel(template, 10)
.catch(error => console.error(error));

```
# Internal Job Concurrency
let's modify our homeworlds example to make a concurrent homeworlds example.
### Internal Job Concurrency
let's modify our homeworlds example to make a concurrent homeworlds example.
We have used the stated `!` operator to remove `personDetails` and `homeworldDetails` from the output to avoid clutter.
JSONata automatically makes array
JSONata automatically makes array
```json
> .init -f "example/concurrent-homeworlds.json"
{
Expand Down Expand Up @@ -351,26 +361,139 @@ JSONata automatically makes array
]
}
```
# Durability
Up until now we have showed how to use pure Stated to build simple jobs. Pure Stated does not provide durability
or high availability. Stated-workflows adds

# Stated Workflow Functions
Stated alone is a powerful and concise workflow engine. So why do
we need Stated-Workflows and what is it? Stated-Workflows is a set
of functions that provide integration with cloud events, durability and high
availability to workflows, when they are executed in the Stated-Workflows
clustered runtime.

## Cloud Events
Stated-Workflows provides a set of functions that allow you to integrate with cloud events, consuming and producing from
Kafka or Pulsar message buses, as well as HTTP. The following example shows how to use the `publish` and `subscribe`
functions. Below producer and subscriber configs are using `test` clients, but `kafka` and `pulsar` clients can be used
to communicate with the actual message buses.

```json
> .init -f "example/pubsub.yaml"
{
"produceParams": {
"type": "my-topic",
"data": "${ [1..5].({'name': 'nozzleTime', 'rando': $random()}) }",
"client": {
"type": "test"
}
},
"subscribeParams": {
"source": "cloudEvent",
"type": "/${ produceParams.type }",
"to": "/${ function($e){( $set('/rxLog', rxLog~>$append($e)); )} }",
"subscriberId": "dingus",
"initialPosition": "latest",
"client": {
"type": "test"
}
},
"send$": "$publish(produceParams)",
"recv$": "$subscribe(subscribeParams)",
"rxLog": []
}
```
<details>
<summary>Execution output (click to expand)</summary>
```json ["$count(data.rxLog)=6"]
> .init -f "example/pubsub.yaml" --tail "/ until $count(rxLog)>=5"
{
"produceParams": {
"type": "my-topic",
"data": [
{
"name": "nozzleTime",
"rando": 0.5776065858162405
},
{
"name": "nozzleTime",
"rando": 0.14603495732221994
},
{
"name": "nozzleTime",
"rando": 0.6747697712879064
},
{
"name": "nozzleTime",
"rando": 0.8244336074302101
},
{
"name": "nozzleTime",
"rando": 0.7426610894846484
}
],
"client": {
"type": "test"
}
},
"subscribeParams": {
"source": "cloudEvent",
"type": "my-topic",
"to": "{function:}",
"subscriberId": "dingus",
"initialPosition": "latest",
"client": {
"type": "test"
}
},
"send$": null,
"recv$": null,
"rxLog": [
{
"name": "nozzleTime",
"rando": 0.6677321749548661
},
{
"name": "nozzleTime",
"rando": 0.46779260195749184
},
{
"name": "nozzleTime",
"rando": 0.3316065714852454
},
{
"name": "nozzleTime",
"rando": 0.7331875081132901
},
{
"name": "nozzleTime",
"rando": 0.4174872067342268
},
{
"name": "nozzleTime",
"rando": 0.5776065858162405
}
]
}
```
</details>

## Durability
Pure Stated does not provide durability or high availability. Stated-workflows adds
the dimension of _durability_ and _high-availability_ to template execution. To achieve these "ilities", Stated-workflows must
run in Stated-Workflow cluster. However, it is not necessary to run in a cluster to write, test, and debug
run in Stated-Workflow cluster. However, it is not necessary to run in a cluster to write, test, and debug
Stated-Workflows locally. As long as you don't "unplug" the stated REPL, it will produce functionally the same result
as running in Stated-Workflow cluster. Stated-Workflows provides a "local cluster" option where you can test the
as running in Stated-Workflow cluster. Stated-Workflows provides a "local cluster" option where you can test the
_durability_ of stated workflows by unceremoniously "killing" the REPL and then restarting the workflow at a later time.

## Steps
## Workflow Steps
Stated provides durability by defining the Step as the unit of durability. A step
is nothing more than a json object that has a field named 'function', that is a JSONata `function`
```json
{
"function": "${ function($in){ $in + 42 } }"
}
```
Let's recast our homeworld example using Steps. This will give the Job durability, so that it
can fail and be restarted. When a step function is called, the step's log is populated with
an entry corresponding to a uniqe `invocationId` for the workflow. The log captures the `args`
Let's recast our homeworld example using Steps. This will give the Job durability, so that it
can fail and be restarted. When a step function is called, the step's log is populated with
an entry corresponding to a uniqe `invocationId` for the workflow. The log captures the `args`
that were passed to the step function, as well the functions output (`out`).

![steps](https://raw.githubusercontent.com/geoffhendrey/jsonataplay/main/homeworld-workflow%20-%20Page%202.svg)
Expand Down Expand Up @@ -719,8 +842,8 @@ the `homeworld-steps.json` workflow, with `--options` that preserve the logs of
</details>

# Error Handling
If a step function throws an `Error`, or returns `undefined`, the invocation log will contain a `fail`. In the
example below we intentionally break the second step by concatenating "--broken--" to the homeword URL.
If a step function throws an `Error`, or returns `undefined`, the invocation log will contain a `fail`. In the
example below we intentionally break the second step by concatenating "--broken--" to the homeword URL.
```json
> .init -f "example/homeworlds-steps-error.json"
{
Expand Down Expand Up @@ -949,9 +1072,9 @@ $serial execution halts on fail.
```
</details>

## retries
Each step can provide an optional boolean function `shouldRetry`, which should accept invocationLog argument. If it
retruns trues, the function will be retried.
### retries
Each step can provide an optional boolean function `shouldRetry`, which should accept invocationLog argument. If it
retruns trues, the function will be retried.
```json
> .init -f example/homeworlds-steps-with-retry.json
{
Expand Down
6 changes: 5 additions & 1 deletion README.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,10 @@ import {StatedWorkflow} from "./src/workflow/StatedWorkflow.js";


TemplateProcessor.DEFAULT_FUNCTIONS = {...TemplateProcessor.DEFAULT_FUNCTIONS, ...StatedWorkflow.FUNCTIONS};
const tp = new TemplateProcessor();
tp.functionGenerators.set("serial", StatedWorkflow.serialGenerator);

parseMarkdownAndTestCodeblocks('./README.md', new CliCore());
const cliCore = new CliCore(tp);

parseMarkdownAndTestCodeblocks('./README.md', cliCore);

26 changes: 26 additions & 0 deletions docker/docker-compose.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
version: '2'
services:
zookeeper:
image: wurstmeister/zookeeper:latest
ports:
- "2181:2181"
kafka:
image: wurstmeister/kafka:2.11-1.1.1
ports:
- "9092:9092"
links:
- zookeeper
environment:
KAFKA_ADVERTISED_HOST_NAME: ${HOST_IP}
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_AUTO_CREATE_TOPICS_ENABLE: 'true'
KAFKA_DELETE_TOPIC_ENABLE: 'true'
KAFKA_CREATE_TOPICS: "topic-test:1:1"
volumes:
- /var/run/docker.sock:/var/run/docker.sock
pulsar:
image: apachepulsar/pulsar:latest
command: bin/pulsar standalone
ports:
- "6650:6650"
- "8080:8080"
3 changes: 1 addition & 2 deletions example/correlate.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,7 @@ listenForCommand:
$e.type = 'DO_IT'?(
$console.log('RECEIVED_COMMAND ' & $e);
$set('/state', 'RECEIVED_COMMAND');
$set('/resp/data/correlationId', $e.correlationId);
$publish(resp);
$publish(resp ~> |data[0]|{correlationId: $e.correlationId}|);
):0;
)} }
subscriberId: commandReceiver
Expand Down
4 changes: 1 addition & 3 deletions example/pubsub.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,8 @@ subscribeParams: #parameters for subscribing to a cloud event
client:
type: test
# starts producer function
send$: $setInterval(function(){$publish(produceParams)}, 100)
send$: $publish(produceParams)
# starts consumer function
recv$: $subscribe(subscribeParams)
# rxLog is a field of the template where the consumer function will be storing results of event processing
rxLog: [ ]
# this is a condition that will stop the workflow when rxLog has 5 elements
stop$: ($count(rxLog)=5?($clearInterval(send$);'done'):'still going')
11 changes: 3 additions & 8 deletions example/wfHttp01.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,7 @@ subscribeParams: #parameters for subscribing to a http request

myWebLambda$: |
function($con){
$con.res.send("Hello Stated Lambda")
$con.res.send("Hello Stated Lambda")
}
2 changes: 1 addition & 1 deletion example/wfPerf01.yaml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
start$: $subscribe(subscribeParams)
name: nozzleWork
subscribeParams: #parameters for subscribing to a cloud event
testData: "${ [1..10000].({'name': 'nozzleTime', 'order':$}) }"
testData: "${ [1..300].({'name': 'nozzleTime', 'order':$}) }"
type: sys:cron
filter$: function($e){ $e.name='nozzleTime' }
to: ../${myWorkflow$}
Expand Down
Loading

0 comments on commit ecb530b

Please sign in to comment.