-
-
Notifications
You must be signed in to change notification settings - Fork 756
Closed
Labels
bugSomething is brokenSomething is brokendeadlockThe cluster appears to not make any progressThe cluster appears to not make any progressstabilityIssue or feature related to cluster stability (e.g. deadlock)Issue or feature related to cluster stability (e.g. deadlock)
Description
Task stealing keeps references to WorkerState objects:
distributed/distributed/stealing.py
Lines 245 to 251 in b8b45c6
| self.in_flight[ts] = { | |
| "victim": victim, # guaranteed to be processing_on | |
| "thief": thief, | |
| "victim_duration": victim_duration, | |
| "thief_duration": thief_duration, | |
| "stimulus_id": stimulus_id, | |
| } |
If a worker disconnects, then reconnects from the address (it restarts, or the shutdown-reconnect bug happens #6354), task stealing can hold a reference to the old WorkerState object for that address, while the scheduler is working with the new WorkerState object for that address.
If tasks are assigned to this old, stale WorkerState, and then the worker leaves, the tasks will be forever stuck in processing (because they're not recognized as being on the worker that just left).
Full trace-through
Copied from #6263 (comment)
- Stealing decides to move a task to worker X.
- It queues a
steal-requestto worker Y (where the task is currently queued), asking it to cancel the task. - Stores a reference to the
victimandthiefWorkerStates (not addresses) inWorkStealing.in_flight
- It queues a
- Worker X gets removed by the scheduler.
- Its
WorkerStateinstance—the one currently referencedWorkStealing.in_flight—is removed fromScheduler.workers. - Worker X heartbeats to the scheduler, reconnecting (bug described above).
- A new
WorkerStateinstance for it is added toScheduler.workers, at the same address. The scheduler thinks nothing is processing on it. - Worker Y finally replies, "hey yeah, it's all cool if you steal that task".
move_task_confirmhandles this, and pops info about the stealing operation fromWorkStealing.in_flight.- This info contains a reference to the
thiefWorkerStateobject. This is the oldWorkerStateinstance, which is no longer inScheduler.workers. - The
thief's address is inscheduler.workers, even though thetheifobject isn't. - The task gets assigned to a worker that, to the scheduler, no longer exists.
- When worker X actually shuts itself down,
Scheduler.remove_workergoes to reschedule any tasks it's processing. But it's looking at the newWorkerStateinstance, and the task was assigned to the old one, so the task is never rescheduled.
I think work stealing should either:
- Store the address and
idof theWorkerStateinstance, instead of a direct reference. Verify thatid(scheduler.workers[addr]) == expected_id. An advantage is that this avoids reference leaks ofWorkerStateobjects (though they should eventually be cleaned up when the task completes) Ensure TaskState instances are released on Scheduler and Worker #6250. - Just verify that
d["thief"] is self.scheduler.workers[theif.address]
Reactions are currently unavailable
Metadata
Metadata
Assignees
Labels
bugSomething is brokenSomething is brokendeadlockThe cluster appears to not make any progressThe cluster appears to not make any progressstabilityIssue or feature related to cluster stability (e.g. deadlock)Issue or feature related to cluster stability (e.g. deadlock)