-
-
Notifications
You must be signed in to change notification settings - Fork 757
Labels
bugSomething is brokenSomething is brokendeadlockThe cluster appears to not make any progressThe cluster appears to not make any progressscheduler
Description
While trying to reproduce #7063, I came across a different error, this one with queueing enabled.
The below reproducer is NOT minimal - there is likely quite a bit of simplification possible.
@gen_cluster(client=True, nthreads=[("", 1)], config={"distributed.scheduler.worker-saturation": 1.5})
async def test_steal_rootish_while_retiring(c, s, a):
"""https://github.com/dask/distributed/issues/7063
Note that this applies to both tasks that raise Reschedule as well as work stealing.
"""
ev = Event()
# Put a task in memory on a, which will be retired, and prevent b from acquiring
# a replica. This will cause a to be stuck in closing_gracefully state until we
# set b.block_gather_dep.
m = c.submit(inc, 1, key="m", workers=[a.address])
await wait(m)
async with BlockedGatherDep(s.address, nthreads=1) as b:
# Large number of tasks to make sure they're rootish
futures = c.map(
lambda i, ev: ev.wait(), range(10), ev=ev, key=[f"x-{i}" for i in range(10)]
)
while a.state.executing_count != 1 or b.state.executing_count != 1:
await asyncio.sleep(0.01)
assert s.is_rootish(s.tasks[futures[0].key])
retire_task = asyncio.create_task(c.retire_workers([a.address]))
# Wait until AMM sends AcquireReplicasEvent to b to move away m
await b.in_gather_dep.wait()
assert s.workers[a.address].status == Status.closing_gracefully
# Steal any of the tasks on a
steal_key = next(iter(a.state.executing)).key
s.reschedule(steal_key, stimulus_id="steal")
await ev.set()
# The stolen task can now complete on the other worker
await wait_for_state(steal_key, "memory", b)
await wait_for_state(steal_key, "memory", s)
# Let graceful retirement of a complete.
# This in turn reschedules whatever tasks were still processing on a to b.
b.block_gather_dep.set()
await retire_task
await wait(futures)The test is green; however I read in the log:
File "/home/crusaderky/github/distributed/distributed/scheduler.py", line 5284, in handle_task_finished
r: tuple = self.stimulus_task_finished(
File "/home/crusaderky/github/distributed/distributed/scheduler.py", line 4649, in stimulus_task_finished
r: tuple = self._transition(
File "/home/crusaderky/github/distributed/distributed/scheduler.py", line 1813, in _transition
assert not args and not kwargs, (args, kwargs, start, finish)
AssertionError: ((), {'worker': 'tcp://127.0.0.1:45929', 'nbytes': 28, 'type': b'\x80\x04\x95\x15\x00\x00\x00\x00\x00\x00\x00\x8c\x08builtins\x94\x8c\x04bool\x94\x93\x94.', 'typename': 'bool', 'metadata': {}, 'thread': 139862053221952, 'startstops': ({'action': 'compute', 'start': 1666802403.9580944, 'stop': 1666802403.9590282},), 'status': 'OK'}, 'queued', 'memory')
What is happening:
- steal_key is processing on a
- steal_key is rescheduled, which causes the scheduler to send a free-keys message to a and put the task back in queue
- before the free-keys message can reach a, steal_key finishes on a
- steal_key transitions to memory on a, sending a TaskFinishedMsg to the scheduler.
- a queued->memory transition happens which, I suspect, is otherwise untested.
This is timing-sensitive; if free-keys reached a before the task end, then steal_key would be cancelled and transition to forgotten without any messaging when it ends.
Reactions are currently unavailable
Metadata
Metadata
Assignees
Labels
bugSomething is brokenSomething is brokendeadlockThe cluster appears to not make any progressThe cluster appears to not make any progressscheduler