Skip to content

Commit

Permalink
Cleanups jan20 (#11)
Browse files Browse the repository at this point in the history
* wip

* WIP

* all tests pass

---------

Co-authored-by: Geoffrey Hendrey <[email protected]>
  • Loading branch information
geoffhendrey and Geoffrey Hendrey authored Jan 22, 2024
1 parent ecb530b commit be685e7
Show file tree
Hide file tree
Showing 9 changed files with 117 additions and 322 deletions.
139 changes: 40 additions & 99 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -373,107 +373,48 @@ clustered runtime.
Stated-Workflows provides a set of functions that allow you to integrate with cloud events, consuming and producing from
Kafka or Pulsar message buses, as well as HTTP. The following example shows how to use the `publish` and `subscribe`
functions. Below producer and subscriber configs are using `test` clients, but `kafka` and `pulsar` clients can be used
to communicate with the actual message buses.
to communicate with the actual message buses. Data for testing can be fed in by setting the `data` field of the `produceParams`.
As shown below, the test data is set to `['luke', 'han', 'leia']`. The subscriber's `to` function, `joinResistance`, appends
each received rebel to the `rebelForces` array.

```yaml
falken$> cat example/pubsub.yaml
# producer will be sending some test data
produceParams:
type: "my-topic"
data: ['luke', 'han', 'leia']
client:
type: test
# the subscriber's 'to' function will be called on each received event
subscribeParams: #parameters for subscribing to an event
source: cloudEvent
type: /${ produceParams.type } # subscribe to the same topic as we are publishing to test events
to: /${joinResistance}
subscriberId: rebelArmy
initialPosition: latest
client:
type: test
joinResistance: /${ function($rebel){ $set('/rebelForces', rebelForces~>$append($rebel))} }
# starts producer function
send$: $publish(produceParams)
# starts consumer function
recv$: $subscribe(subscribeParams)
# the subscriber's `to` function will write the received data here
rebelForces: [ ]
````
We can use the REPL to run the pubsub.yaml example, and monitor the `/rebelForces` by tailing it until it contains all
three of the published messages.
```json ["data=['luke', 'han', 'leia']"]
> .init -f "example/pubsub.yaml" --tail "/rebelForces until $=['luke', 'han', 'leia']"
Started tailing... Press Ctrl+C to stop.
[
"luke",
"han",
"leia"
]
```json
> .init -f "example/pubsub.yaml"
{
"produceParams": {
"type": "my-topic",
"data": "${ [1..5].({'name': 'nozzleTime', 'rando': $random()}) }",
"client": {
"type": "test"
}
},
"subscribeParams": {
"source": "cloudEvent",
"type": "/${ produceParams.type }",
"to": "/${ function($e){( $set('/rxLog', rxLog~>$append($e)); )} }",
"subscriberId": "dingus",
"initialPosition": "latest",
"client": {
"type": "test"
}
},
"send$": "$publish(produceParams)",
"recv$": "$subscribe(subscribeParams)",
"rxLog": []
}
```
<details>
<summary>Execution output (click to expand)</summary>
```json ["$count(data.rxLog)=6"]
> .init -f "example/pubsub.yaml" --tail "/ until $count(rxLog)>=5"
{
"produceParams": {
"type": "my-topic",
"data": [
{
"name": "nozzleTime",
"rando": 0.5776065858162405
},
{
"name": "nozzleTime",
"rando": 0.14603495732221994
},
{
"name": "nozzleTime",
"rando": 0.6747697712879064
},
{
"name": "nozzleTime",
"rando": 0.8244336074302101
},
{
"name": "nozzleTime",
"rando": 0.7426610894846484
}
],
"client": {
"type": "test"
}
},
"subscribeParams": {
"source": "cloudEvent",
"type": "my-topic",
"to": "{function:}",
"subscriberId": "dingus",
"initialPosition": "latest",
"client": {
"type": "test"
}
},
"send$": null,
"recv$": null,
"rxLog": [
{
"name": "nozzleTime",
"rando": 0.6677321749548661
},
{
"name": "nozzleTime",
"rando": 0.46779260195749184
},
{
"name": "nozzleTime",
"rando": 0.3316065714852454
},
{
"name": "nozzleTime",
"rando": 0.7331875081132901
},
{
"name": "nozzleTime",
"rando": 0.4174872067342268
},
{
"name": "nozzleTime",
"rando": 0.5776065858162405
}
]
}
```
</details>


## Durability
Pure Stated does not provide durability or high availability. Stated-workflows adds
Expand Down
19 changes: 9 additions & 10 deletions example/pubsub.yaml
Original file line number Diff line number Diff line change
@@ -1,23 +1,22 @@
# producer will be sending data function output to the "type" topic
# producer will be sending some test data
produceParams:
type: "my-topic"
data: "${ [1..5].({'name': 'nozzleTime', 'rando': $random()}) }"
data: ['luke', 'han', 'leia']
client:
type: test
# producer will be invoking "to" function for each consumed event
subscribeParams: #parameters for subscribing to a cloud event
# the subscriber's 'to' function will be called on each received event
subscribeParams: #parameters for subscribing to an event
source: cloudEvent
type: /${ produceParams.type } # subscribe to the same topic as we are publishing to test events
to: /${ function($e){(
$set('/rxLog', rxLog~>$append($e));
)} }
subscriberId: dingus
to: /${joinResistance}
subscriberId: rebelArmy
initialPosition: latest
client:
type: test
joinResistance: /${ function($rebel){ $set('/rebelForces', rebelForces~>$append($rebel))} }
# starts producer function
send$: $publish(produceParams)
# starts consumer function
recv$: $subscribe(subscribeParams)
# rxLog is a field of the template where the consumer function will be storing results of event processing
rxLog: [ ]
# the subscriber's `to` function will write the received data here
rebelForces: [ ]
4 changes: 2 additions & 2 deletions src/test/StatedWorkflow.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -111,10 +111,10 @@ test("pubsub", async () => {
// instantiate template processor
const tp = await StatedWorkflow.newWorkflow(template);
await tp.initialize();
while(tp.output.rxLog.length < 5){
while(tp.output.rebelForces.length < 3){
await new Promise(resolve => setTimeout(resolve, 50)); // Poll every 50ms
}
expect(Object.keys(tp.output.rxLog).length).toBe(5);
expect(tp.output.rebelForces).toEqual(['luke', 'han', 'leia']);
}, 8000);

test("correlate", async () => {
Expand Down
11 changes: 8 additions & 3 deletions src/workflow/StatedWorkflow.js
Original file line number Diff line number Diff line change
Expand Up @@ -60,13 +60,15 @@ export class StatedWorkflow {

// this methd returns a TemplateProcessor instance with the default functions and Stated Workflow functions. It also
// initializes persistence store, and set generator functions.
static async newWorkflow(template, persistenceType = 'noop') {
this.persistence = new createPersistence({persistenceType: persistenceType});
//FIXME TODO -- newXXX should return an XXX
static async newWorkflow(template={}, persistenceType = 'noop') {
this.persistence = createPersistence({persistenceType: persistenceType});
await this.persistence.init();
TemplateProcessor.DEFAULT_FUNCTIONS = {...TemplateProcessor.DEFAULT_FUNCTIONS, ...StatedWorkflow.FUNCTIONS};
const tp = new TemplateProcessor(template);
tp.functionGenerators.set("serial", StatedWorkflow.serialGenerator);
tp.logLevel = logLevel.ERROR; //log level must be ERROR by default. Do not commit code that sets this to DEBUG as a default
tp.onInitialize = WorkflowDispatcher.clear; //must remove all subscribers when template reinitialized
return tp;
}

Expand Down Expand Up @@ -159,7 +161,7 @@ export class StatedWorkflow {
if (clientParams && clientParams.type === 'test') {
this.logger.debug(`test client provided, will not publish to 'real' message broker for publish parameters ${StatedREPL.stringify(params)}`);
WorkflowDispatcher.addBatchToAllSubscribers(type, data);
return;
return "done";
}

const {type:clientType} = clientParams
Expand All @@ -170,6 +172,7 @@ export class StatedWorkflow {
}else{
throw new Error(`Unsupported clientType: ${clientType}`);
}
return "done";
}

static publishKafka(params, clientParams) {
Expand Down Expand Up @@ -358,6 +361,7 @@ export class StatedWorkflow {
}else{
throw new Error(`unsupported client.type in ${StatedREPL.stringify(subscriptionParams)}`);
}
return `listening clientType=${clientType} ... `
}

static onHttp(subscriptionParams) {
Expand All @@ -374,6 +378,7 @@ export class StatedWorkflow {
console.log(`Server started on http://localhost:${StatedWorkflow.port}`);
});

return "listening http ..."

}

Expand Down
71 changes: 0 additions & 71 deletions src/workflow/StatedWorkflowConsumer.js

This file was deleted.

17 changes: 0 additions & 17 deletions src/workflow/StatedWorkflowPublisher.js

This file was deleted.

Loading

0 comments on commit be685e7

Please sign in to comment.