Skip to content

Commit

Permalink
Merge pull request #163 from conductor-sdk/feature/fail-on-running-id…
Browse files Browse the repository at this point in the history
…empotency-strategy

Added `FAIL_ON_RUNNING` Idempotency Strategy
  • Loading branch information
jmigueprieto authored Nov 27, 2024
2 parents 0379860 + 73f8e08 commit 2750549
Show file tree
Hide file tree
Showing 2 changed files with 47 additions and 0 deletions.
1 change: 1 addition & 0 deletions sdk/model/idempotency_strategy.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,4 +14,5 @@ type IdempotencyStrategy string
const (
FailOnConflict IdempotencyStrategy = "FAIL"
ReturnExisting IdempotencyStrategy = "RETURN_EXISTING"
FailOnRunning IdempotencyStrategy = "FAIL_ON_RUNNING"
)
46 changes: 46 additions & 0 deletions test/integration_tests/workflow_idempotency_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (

"github.com/conductor-sdk/conductor-go/sdk/model"
"github.com/conductor-sdk/conductor-go/sdk/workflow"
"github.com/conductor-sdk/conductor-go/sdk/workflow/executor"
"github.com/conductor-sdk/conductor-go/test/testdata"
"github.com/stretchr/testify/assert"
)
Expand Down Expand Up @@ -49,3 +50,48 @@ func TestIdempotencyCombinations(t *testing.T) {
)
assert.NoError(t, err, "Failed to delete workflow definition ", err)
}

func TestIdempotencyFailOnRunning(t *testing.T) {
executor := testdata.WorkflowExecutor
wf := workflow.NewConductorWorkflow(executor)
wf.Name("temp_wf_" + strconv.Itoa(time.Now().Nanosecond())).Version(1)
wf = wf.Add(workflow.NewSimpleTask("simple_task_1", "simple_task_1"))
err := wf.Register(true)
assert.NoError(t, err, "Failed to register workflow")

// (1) workflow should start
id, err := executor.StartWorkflow(&model.StartWorkflowRequest{Name: wf.GetName(), IdempotencyKey: "test", IdempotencyStrategy: model.FailOnRunning})
assert.NoError(t, err, "Failed to start workflow")

// (2) workflow start should fail because (1) is running
_, err = executor.StartWorkflow(&model.StartWorkflowRequest{Name: wf.GetName(), IdempotencyKey: "test", IdempotencyStrategy: model.FailOnRunning})
assert.Error(t, err, "Workflow should have failed but there was no error")

// complete task so that workflow is completed
err = executor.UpdateTaskByRefName("simple_task_1", id, model.CompletedTask, map[string]interface{}{})
assert.NoError(t, err, "Failed to update task")

checkWorkflowIsCompleted(t, executor, id)

// workflow should start
id2, err := executor.StartWorkflow(&model.StartWorkflowRequest{Name: wf.GetName(), IdempotencyKey: "test", IdempotencyStrategy: model.FailOnRunning})
assert.NoError(t, err, "Failed to start workflow")
assert.NotEqual(t, id, id2)
}

func checkWorkflowIsCompleted(t *testing.T, executor *executor.WorkflowExecutor, id string) {
timeout := time.After(5 * time.Second)
tick := time.Tick(1 * time.Second)

for {
select {
case <-timeout:
t.Fatalf("Timed out and workflow %s didn't complete", id)
case <-tick:
wf, err := executor.GetWorkflow(id, false)
assert.NoError(t, err)
assert.Equal(t, model.CompletedWorkflow, wf.Status)
return
}
}
}

0 comments on commit 2750549

Please sign in to comment.