This is tightly related with #6867 and dask/dask#9330.
There is a deadlock which is triggered by this code path:
|
if ts.state == "cancelled": |
|
logger.debug( |
|
"Trying to execute task %s which is not in executing state anymore", |
|
ts, |
|
) |
|
return AlreadyCancelledEvent(key=ts.key, stimulus_id=stimulus_id) |
which in turn triggers:
|
@_handle_event.register |
|
def _handle_already_cancelled(self, ev: AlreadyCancelledEvent) -> RecsInstrs: |
|
"""Task is already cancelled by the time execute() runs""" |
|
# key *must* be still in tasks. Releasing it directly is forbidden |
|
# without going through cancelled |
|
ts = self.tasks.get(ev.key) |
|
assert ts, self.story(ev.key) |
|
ts.done = True |
|
return {ts: "released"}, [] |
The deadlock should be reproducible as follows:
handle_stimulus(ComputeTaskEvent(key="x")
ts.state=executing; create asyncio task for Worker.execute
handle_stimulus(FreeKeysEvent(keys=["x"])
ts.state=cancelled
await asyncio.sleep(0)
Worker.execute runs and returns AlreadyCancelledEvent.
This causes the _handle_stimulus_from_task callback to be appended to the end of the event loop.
However, the test suite is before that in the event loop:
handle_stimulus(ComputeTaskEvent(key="x")
ts.state=resumed
await ... (anything that releases the event loop)
This runs _handle_stimulus_from_task,
which runs _handle_already_cancelled,
which returns {ts: "released"},
which triggers the (resumed, released) transition,
which sends the task to cancelled state, while the scheduler thinks it's running.
@fjetter @gjoseph92 my head is spinning.
This is tightly related with #6867 and dask/dask#9330.
There is a deadlock which is triggered by this code path:
distributed/distributed/worker.py
Lines 2152 to 2157 in f43bc47
which in turn triggers:
distributed/distributed/worker_state_machine.py
Lines 2931 to 2939 in f43bc47
The deadlock should be reproducible as follows:
handle_stimulus(ComputeTaskEvent(key="x")ts.state=executing; create asyncio task for Worker.execute
handle_stimulus(FreeKeysEvent(keys=["x"])ts.state=cancelled
await asyncio.sleep(0)Worker.execute runs and returns
AlreadyCancelledEvent.This causes the
_handle_stimulus_from_taskcallback to be appended to the end of the event loop.However, the test suite is before that in the event loop:
handle_stimulus(ComputeTaskEvent(key="x")ts.state=resumed
await ...(anything that releases the event loop)This runs
_handle_stimulus_from_task,which runs
_handle_already_cancelled,which returns
{ts: "released"},which triggers the (resumed, released) transition,
which sends the task to cancelled state, while the scheduler thinks it's running.
@fjetter @gjoseph92 my head is spinning.