Cancel asyncio tasks on worker close#6098
Conversation
|
@graingert do you have time to review this? |
distributed/tests/test_worker.py
Outdated
|
|
||
| async def f(ev): | ||
| await ev.set() | ||
| await asyncio.Future() # Block indefinitely |
There was a problem hiding this comment.
| await asyncio.Future() # Block indefinitely | |
| await asyncio.Event().wait() |
There was a problem hiding this comment.
it's the same, but sure
|
Thank you for the review @graingert . Aside from those two comments are you ok with this PR? (should I merge if @crusaderky resolves those issues?) |
distributed/worker.py
Outdated
| for task in self._async_instructions: | ||
| task.cancel() | ||
| while self._async_instructions: | ||
| await asyncio.sleep(0) |
There was a problem hiding this comment.
probably will need an asyncio.gather(*self._async_instructions, return_exceptions=True) and something to handle close itself being cancelled
There was a problem hiding this comment.
if close itself is cancelled, it should not wait for other tasks being cancelled to return (which will happen at the next event loop cycle anyway)
There was a problem hiding this comment.
Tasks can handle the cancel eg:
async def example():
async with database():
await asyncio.Event().wait()The worker will need to wait for the async context manager to close
There was a problem hiding this comment.
First, this isn't a plugin system. We don't have any tasks there that catch CancelledError.
Second,
while self._async_instructions:
await asyncio.sleep(0)is doing exactly the same thing as the asyncio.gather you suggested.
There was a problem hiding this comment.
a user can submit an async function via c.submit(f, ev, key="f1") which is likely to suppress cancellation. using while self._async_instructions: await asyncio.sleep(0) burns the cpu while waiting for cancellation to be handled
There was a problem hiding this comment.
See latest commit
|
@graingert any further concerns here, or should I merge? |
Upon
Worker.close(), clean up the nanny asyncio tasks that were spawned to runWorker.execute().ensure_computingtransitions to newWorkerStateevent mechanism #5895Caveats