From 7d43e2f18bfb5a42acd7758c090e11f934fe5588 Mon Sep 17 00:00:00 2001 From: Hendrik Makait Date: Tue, 5 Nov 2024 13:44:17 +0100 Subject: [PATCH] Add stimulus_id to update_graph plugin hook (#8923) --- distributed/diagnostics/plugin.py | 3 +++ distributed/diagnostics/tests/test_scheduler_plugin.py | 4 ++++ distributed/scheduler.py | 4 +++- 3 files changed, 10 insertions(+), 1 deletion(-) diff --git a/distributed/diagnostics/plugin.py b/distributed/diagnostics/plugin.py index 71c6cc1ed9..b3f280d15e 100644 --- a/distributed/diagnostics/plugin.py +++ b/distributed/diagnostics/plugin.py @@ -101,6 +101,7 @@ def update_graph( annotations: dict[str, dict[Key, Any]], priority: dict[Key, tuple[int | float, ...]], dependencies: dict[Key, set[Key]], + stimulus_id: str, **kwargs: Any, ) -> None: """Run when a new graph / tasks enter the scheduler @@ -129,6 +130,8 @@ def update_graph( Task calculated priorities as assigned to the tasks. dependencies: A mapping that maps a key to its dependencies. + stimulus_id: + ID of the stimulus causing the graph update **kwargs: It is recommended to allow plugins to accept more parameters to ensure future compatibility. diff --git a/distributed/diagnostics/tests/test_scheduler_plugin.py b/distributed/diagnostics/tests/test_scheduler_plugin.py index c49bd96510..3f48caa31a 100644 --- a/distributed/diagnostics/tests/test_scheduler_plugin.py +++ b/distributed/diagnostics/tests/test_scheduler_plugin.py @@ -493,6 +493,7 @@ def update_graph( # type: ignore annotations, priority, dependencies, + stimulus_id, **kwargs, ) -> None: assert scheduler is s @@ -505,6 +506,7 @@ def update_graph( # type: ignore assert len(priority) == 1 assert isinstance(priority["foo"], tuple) assert dependencies == {"foo": set()} + assert stimulus_id is not None self.success = True plugin = UpdateGraph() @@ -533,6 +535,7 @@ def update_graph( # type: ignore annotations, priority, dependencies, + stimulus_id, **kwargs, ) -> None: assert scheduler is s @@ -553,6 +556,7 @@ def update_graph( # type: ignore assert k in dependencies assert dependencies["f1"] == set() assert dependencies["sum"] == {"f1", "f3"} + assert stimulus_id is not None self.success = True diff --git a/distributed/scheduler.py b/distributed/scheduler.py index 35f2aecb22..cd5919c5e1 100644 --- a/distributed/scheduler.py +++ b/distributed/scheduler.py @@ -4780,6 +4780,7 @@ def _create_taskstate_from_graph( dependencies=dependencies, annotations=dict(annotations_for_plugin), priority=priority, + stimulus_id=stimulus_id, ) except Exception as e: logger.exception(e) @@ -4846,6 +4847,7 @@ async def update_graph( stimulus_id: str | None = None, ) -> None: start = time() + stimulus_id = stimulus_id or f"update-graph-{start}" self._active_graph_updates += 1 try: logger.debug("Received new graph. Deserializing...") @@ -4923,7 +4925,7 @@ async def update_graph( # objects. This should be removed global_annotations=annotations, start=start, - stimulus_id=stimulus_id or f"update-graph-{start}", + stimulus_id=stimulus_id, ) task_state_created = time() metrics.update(