Skip to content

Commit

Permalink
Add stimulus_id to update_graph plugin hook (#8923)
Browse files Browse the repository at this point in the history
  • Loading branch information
hendrikmakait authored Nov 5, 2024
1 parent f340f18 commit 7d43e2f
Show file tree
Hide file tree
Showing 3 changed files with 10 additions and 1 deletion.
3 changes: 3 additions & 0 deletions distributed/diagnostics/plugin.py
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,7 @@ def update_graph(
annotations: dict[str, dict[Key, Any]],
priority: dict[Key, tuple[int | float, ...]],
dependencies: dict[Key, set[Key]],
stimulus_id: str,
**kwargs: Any,
) -> None:
"""Run when a new graph / tasks enter the scheduler
Expand Down Expand Up @@ -129,6 +130,8 @@ def update_graph(
Task calculated priorities as assigned to the tasks.
dependencies:
A mapping that maps a key to its dependencies.
stimulus_id:
ID of the stimulus causing the graph update
**kwargs:
It is recommended to allow plugins to accept more parameters to
ensure future compatibility.
Expand Down
4 changes: 4 additions & 0 deletions distributed/diagnostics/tests/test_scheduler_plugin.py
Original file line number Diff line number Diff line change
Expand Up @@ -493,6 +493,7 @@ def update_graph( # type: ignore
annotations,
priority,
dependencies,
stimulus_id,
**kwargs,
) -> None:
assert scheduler is s
Expand All @@ -505,6 +506,7 @@ def update_graph( # type: ignore
assert len(priority) == 1
assert isinstance(priority["foo"], tuple)
assert dependencies == {"foo": set()}
assert stimulus_id is not None
self.success = True

plugin = UpdateGraph()
Expand Down Expand Up @@ -533,6 +535,7 @@ def update_graph( # type: ignore
annotations,
priority,
dependencies,
stimulus_id,
**kwargs,
) -> None:
assert scheduler is s
Expand All @@ -553,6 +556,7 @@ def update_graph( # type: ignore
assert k in dependencies
assert dependencies["f1"] == set()
assert dependencies["sum"] == {"f1", "f3"}
assert stimulus_id is not None

self.success = True

Expand Down
4 changes: 3 additions & 1 deletion distributed/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -4780,6 +4780,7 @@ def _create_taskstate_from_graph(
dependencies=dependencies,
annotations=dict(annotations_for_plugin),
priority=priority,
stimulus_id=stimulus_id,
)
except Exception as e:
logger.exception(e)
Expand Down Expand Up @@ -4846,6 +4847,7 @@ async def update_graph(
stimulus_id: str | None = None,
) -> None:
start = time()
stimulus_id = stimulus_id or f"update-graph-{start}"
self._active_graph_updates += 1
try:
logger.debug("Received new graph. Deserializing...")
Expand Down Expand Up @@ -4923,7 +4925,7 @@ async def update_graph(
# objects. This should be removed
global_annotations=annotations,
start=start,
stimulus_id=stimulus_id or f"update-graph-{start}",
stimulus_id=stimulus_id,
)
task_state_created = time()
metrics.update(
Expand Down

0 comments on commit 7d43e2f

Please sign in to comment.