From 624d849a5d2912e516f47e31d53ebb2905dcead3 Mon Sep 17 00:00:00 2001 From: Luke Zappia Date: Tue, 21 Jan 2025 17:11:50 +0100 Subject: [PATCH] Remove grouping bottleneck (#42) --- src/data_processors/process_integration/main.nf | 9 ++++++--- src/workflows/run_benchmark/main.nf | 3 ++- 2 files changed, 8 insertions(+), 4 deletions(-) diff --git a/src/data_processors/process_integration/main.nf b/src/data_processors/process_integration/main.nf index decbe484..18af2fc5 100644 --- a/src/data_processors/process_integration/main.nf +++ b/src/data_processors/process_integration/main.nf @@ -44,9 +44,12 @@ workflow run_wf { // group by original dataset id | map{id, state -> - [state.prevId, state] + // groupKey() allows us to set a size for each group based on the state + // which means each group can continue once it is complete + [groupKey(state.prevId, state.resolutions.size()), state] } - | groupTuple() + // Group and sort by resolution to ensure the order is consistent + | groupTuple(sort: { res1, res2 -> res1.resolution <=> res2.resolution }) // merge the clustering results into one state | map{ id, states -> @@ -60,7 +63,7 @@ workflow run_wf { def clusterings = states.collect { it.output_clustering } def newState = states[0] + ["clusterings": clusterings] - [id, newState] + [id.toString(), newState] } // merge clustering results into dataset h5ad diff --git a/src/workflows/run_benchmark/main.nf b/src/workflows/run_benchmark/main.nf index 4c9a9d3c..e4e1ad32 100644 --- a/src/workflows/run_benchmark/main.nf +++ b/src/workflows/run_benchmark/main.nf @@ -148,7 +148,8 @@ workflow run_wf { fromState: [ input_integrated: "method_output", input_dataset: "input_dataset", - expected_method_types: "method_types" + expected_method_types: "method_types", + resolutions: "resolutions" ], toState: { id, output, state -> // Add method types to the state