From bea9cb7dfc5f0a40798a3634cb6e801964d4d6f5 Mon Sep 17 00:00:00 2001 From: Yuguang Wang Date: Tue, 28 Mar 2023 16:15:12 +0800 Subject: [PATCH] Fixes KeyError for 'sources' when accessing comsumed source components Also use `stop` instead of `break_current_loop` to control the flow. Fixes OLCS-510 --- openlcs/libs/corgi.py | 6 +++++- openlcsd/flow/tasks.py | 2 +- 2 files changed, 6 insertions(+), 2 deletions(-) diff --git a/openlcs/libs/corgi.py b/openlcs/libs/corgi.py index 0a683c8b..87f852d1 100644 --- a/openlcs/libs/corgi.py +++ b/openlcs/libs/corgi.py @@ -597,7 +597,11 @@ def should_yield_data(component, result): try: component = next(components) except StopIteration: - yield result + # StopIteration may appear when `should_yield_data` is + # False, i.e., the last iteration(before exhausting) may + # accumulates components less than `num_components`. + if len(result) > 1: + yield result break else: subscription_sources, subscription_missings = [], [] diff --git a/openlcsd/flow/tasks.py b/openlcsd/flow/tasks.py index 42bda6fb..bf23d031 100644 --- a/openlcsd/flow/tasks.py +++ b/openlcsd/flow/tasks.py @@ -1495,7 +1495,7 @@ def populate_source_components(context, engine): source_components = next(generator) except StopIteration: # Stop the flow since all chunks are consumed. - engine.break_current_loop() + engine.stop() else: context["source_components"] = source_components