From c38c509e41c7059e0c697b0584276fa26dd0e25a Mon Sep 17 00:00:00 2001 From: Florian Jetter Date: Thu, 7 Nov 2024 14:25:03 +0100 Subject: [PATCH] Fix flaky test_server_comms_mark_active_handlers (#8927) --- distributed/tests/test_core.py | 17 ++++++++++++----- 1 file changed, 12 insertions(+), 5 deletions(-) diff --git a/distributed/tests/test_core.py b/distributed/tests/test_core.py index 6f4836ae08..02eccf6437 100644 --- a/distributed/tests/test_core.py +++ b/distributed/tests/test_core.py @@ -1052,9 +1052,12 @@ async def test_server_comms_mark_active_handlers(): ensure this is properly reflected and released. The sentinel for "open comm but no active handler" is `None` """ + in_handler = asyncio.Event() + unblock_handler = asyncio.Event() async def long_handler(comm): - await asyncio.sleep(0.2) + in_handler.set() + await unblock_handler.wait() return "done" async with Server({"wait": long_handler}) as server: @@ -1063,9 +1066,9 @@ async def long_handler(comm): comm = await connect(server.address) await comm.write({"op": "wait"}) - while not server._comms: - await asyncio.sleep(0.05) + await in_handler.wait() assert set(server._comms.values()) == {"wait"} + unblock_handler.set() assert server.incoming_comms_open == 1 assert server.incoming_comms_active == 1 @@ -1106,9 +1109,12 @@ def validate_dict(server): async with Server({}) as server2: rpc_ = server2.rpc(server.address) + in_handler.clear() + unblock_handler.clear() task = asyncio.create_task(rpc_.wait()) - while not server.incoming_comms_active: - await asyncio.sleep(0.1) + + await in_handler.wait() + assert server.incoming_comms_active == 1 assert server.incoming_comms_open == 1 assert server.outgoing_comms_active == 0 @@ -1120,6 +1126,7 @@ def validate_dict(server): assert server2.outgoing_comms_open == 1 validate_dict(server) + unblock_handler.set() await task assert server.incoming_comms_active == 0 assert server.incoming_comms_open == 1