Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implemented voluntary cancellation in worker threads #629

Merged
merged 12 commits into from
Nov 22, 2023
Merged
19 changes: 12 additions & 7 deletions src/anyio/_backends/_asyncio.py
Original file line number Diff line number Diff line change
Expand Up @@ -2107,9 +2107,15 @@ async def run_sync_in_worker_thread(

@classmethod
def check_cancelled(cls) -> None:
scope = threadlocals.current_cancel_scope
if scope.cancel_called or scope._parent_cancelled():
raise CancelledError
scope: CancelScope | None = threadlocals.current_cancel_scope
while scope is not None:
if scope.cancel_called:
raise CancelledError(f"Cancelled by cancel scope {id(scope):x}")

if scope.shield:
return

scope = scope._parent_scope

@classmethod
def run_async_from_thread(
Expand All @@ -2125,6 +2131,8 @@ async def task_wrapper(scope: CancelScope) -> T_Retval:
scope._tasks.add(task)
try:
return await func(*args)
except CancelledError as exc:
raise concurrent.futures.CancelledError(str(exc)) from None
Comment on lines +2131 to +2132
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Won't this break the chain of any exceptions attached to the CancelledError? I guess that's probably rare.

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure if the chain is preserved anyway, but I can check if you like.

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I checked and it doesn't seem to matter. There's a C level black box between that reraise and the calling synchronous code.

finally:
scope._tasks.discard(task)

Expand All @@ -2135,10 +2143,7 @@ async def task_wrapper(scope: CancelScope) -> T_Retval:
f: concurrent.futures.Future[T_Retval] = context.run(
asyncio.run_coroutine_threadsafe, wrapper, loop
)
try:
return f.result()
except concurrent.futures.CancelledError:
raise CancelledError from None
return f.result()

@classmethod
def run_sync_from_thread(
Expand Down