Skip to content

Deadlock fetching key from retiring worker, when scheduler thinks we already have the key #6244

@gjoseph92

Description

@gjoseph92

This is somewhat conjecture based on a partial cluster dump from @bnaul (full one wasn't working for some reason). Hopefully we can attach the dump at some point so others can see. I only got to see the dump of a worker. If we could see the scheduler state, we could confirm this theory.

This was the worker story of the bad key
In [9]: dump.worker_stories("('_split-bec215abf9d605cb60c27727e9a88b7e', 1126)")
Out[9]: 
{'tcp://10.125.88.48:37099': [["('_split-bec215abf9d605cb60c27727e9a88b7e', 1126)",
   'ensure-task-exists',
   'released',
   'active_memory_manager-1651174357.5785856',
   datetime.datetime(2022, 4, 28, 13, 32, 37, 586432)],
  ["('_split-bec215abf9d605cb60c27727e9a88b7e', 1126)",
   'released',
   'fetch',
   'fetch',
   {},
   'active_memory_manager-1651174357.5785856',
   datetime.datetime(2022, 4, 28, 13, 32, 37, 586584)],
  ["('_split-bec215abf9d605cb60c27727e9a88b7e', 1126)",
   'compute-task',
   'compute-task-1651174375.3724043',
   datetime.datetime(2022, 4, 28, 13, 32, 55, 378109)],
  ["('_split-bec215abf9d605cb60c27727e9a88b7e', 1126)",
   'release-key',
   'compute-task-1651174375.3724043',
   datetime.datetime(2022, 4, 28, 13, 32, 55, 378176)],
  ["('_split-bec215abf9d605cb60c27727e9a88b7e', 1126)",
   'fetch',
   'released',
   'released',
   {"('_split-bec215abf9d605cb60c27727e9a88b7e', 1126)": 'forgotten'},
   'compute-task-1651174375.3724043',
   datetime.datetime(2022, 4, 28, 13, 32, 55, 378205)],
  ["('_split-bec215abf9d605cb60c27727e9a88b7e', 1126)",
   'released',
   'forgotten',
   'forgotten',
   {},
   'compute-task-1651174375.3724043',
   datetime.datetime(2022, 4, 28, 13, 32, 55, 378212)],
  ["('_split-bec215abf9d605cb60c27727e9a88b7e', 1126)",
   'fetch',
   'waiting',
   'forgotten',
   {"('_split-bec215abf9d605cb60c27727e9a88b7e', 1126)": 'forgotten'},
   'compute-task-1651174375.3724043',
   datetime.datetime(2022, 4, 28, 13, 32, 55, 378214)]],
 'tcp://10.124.15.5:35871': [["('_split-bec215abf9d605cb60c27727e9a88b7e', 1126)",
   'ensure-task-exists',
   'released',
   'active_memory_manager-1651174147.6869376',
   datetime.datetime(2022, 4, 28, 13, 29, 7, 714652)],
  ["('_split-bec215abf9d605cb60c27727e9a88b7e', 1126)",
   'released',
   'fetch',
   'fetch',
   {},
   'active_memory_manager-1651174147.6869376',
   datetime.datetime(2022, 4, 28, 13, 29, 7, 714735)],
  ["('_split-bec215abf9d605cb60c27727e9a88b7e', 1126)",
   'ensure-task-exists',
   'fetch',
   'active_memory_manager-1651174149.5793703',
   datetime.datetime(2022, 4, 28, 13, 29, 9, 587408)],
  ['receive-dep-failed',
   'tcp://10.124.227.26:35167',
   ["('_split-bec215abf9d605cb60c27727e9a88b7e', 1126)",
    "('read-parquet-4eae21bcb0e0252ce122c72c6dfb099a', 2764)",
    "('_split-bec215abf9d605cb60c27727e9a88b7e', 5726)",
    "('_split-bec215abf9d605cb60c27727e9a88b7e', 9555)",
    "('read-parquet-4eae21bcb0e0252ce122c72c6dfb099a', 7277)",
    "('read-parquet-4eae21bcb0e0252ce122c72c6dfb099a', 257)",
    "('_split-bec215abf9d605cb60c27727e9a88b7e', 8041)",
    "('_split-bec215abf9d605cb60c27727e9a88b7e', 2732)"],
   'ensure-communicating-1651174136.9395597',
   datetime.datetime(2022, 4, 28, 13, 31, 0, 999395)],
  ['missing-who-has',
   'tcp://10.124.227.26:35167',
   "('_split-bec215abf9d605cb60c27727e9a88b7e', 1126)",
   'ensure-communicating-1651174136.9395597',
   datetime.datetime(2022, 4, 28, 13, 31, 0, 999403)],
  ["('_split-bec215abf9d605cb60c27727e9a88b7e', 1126)",
   'fetch',
   'missing',
   'missing',
   {},
   'ensure-communicating-1651174136.9395597',
   datetime.datetime(2022, 4, 28, 13, 31, 0, 999475)],
  ["('_split-bec215abf9d605cb60c27727e9a88b7e', 1126)",
   'missing',
   'fetch',
   'fetch',
   {},
   'find-missing-1651174261.5426853',
   datetime.datetime(2022, 4, 28, 13, 31, 1, 556907)],
  ['gather-dependencies',
   'tcp://10.124.227.26:35167',
   ["('_split-bec215abf9d605cb60c27727e9a88b7e', 1126)",
    "('read-parquet-4eae21bcb0e0252ce122c72c6dfb099a', 2764)",
    "('_split-bec215abf9d605cb60c27727e9a88b7e', 5726)",
    "('_split-bec215abf9d605cb60c27727e9a88b7e', 9555)",
    "('read-parquet-4eae21bcb0e0252ce122c72c6dfb099a', 7277)",
    "('read-parquet-4eae21bcb0e0252ce122c72c6dfb099a', 257)",
    "('_split-bec215abf9d605cb60c27727e9a88b7e', 8041)",
    "('_split-bec215abf9d605cb60c27727e9a88b7e', 2732)"],
   'ensure-communicating-1651174261.556927',
   datetime.datetime(2022, 4, 28, 13, 31, 1, 557029)],
  ["('_split-bec215abf9d605cb60c27727e9a88b7e', 1126)",
   'fetch',
   'flight',
   'flight',
   {},
   'ensure-communicating-1651174261.556927',
   datetime.datetime(2022, 4, 28, 13, 31, 1, 557074)],
  ['request-dep',
   'tcp://10.124.227.26:35167',
   ["('_split-bec215abf9d605cb60c27727e9a88b7e', 1126)",
    "('_split-bec215abf9d605cb60c27727e9a88b7e', 5726)",
    "('_split-bec215abf9d605cb60c27727e9a88b7e', 9555)",
    "('read-parquet-4eae21bcb0e0252ce122c72c6dfb099a', 7277)",
    "('read-parquet-4eae21bcb0e0252ce122c72c6dfb099a', 257)",
    "('read-parquet-4eae21bcb0e0252ce122c72c6dfb099a', 2764)",
    "('_split-bec215abf9d605cb60c27727e9a88b7e', 2732)",
    "('_split-bec215abf9d605cb60c27727e9a88b7e', 8041)"],
   'ensure-communicating-1651174261.556927',
   datetime.datetime(2022, 4, 28, 13, 31, 1, 557221)],
  ['receive-dep-failed',
   'tcp://10.124.227.26:35167',
   ["('_split-bec215abf9d605cb60c27727e9a88b7e', 1126)",
    "('_split-bec215abf9d605cb60c27727e9a88b7e', 5726)",
    "('_split-bec215abf9d605cb60c27727e9a88b7e', 9555)",
    "('read-parquet-4eae21bcb0e0252ce122c72c6dfb099a', 7277)",
    "('read-parquet-4eae21bcb0e0252ce122c72c6dfb099a', 257)",
    "('read-parquet-4eae21bcb0e0252ce122c72c6dfb099a', 2764)",
    "('_split-bec215abf9d605cb60c27727e9a88b7e', 2732)",
    "('_split-bec215abf9d605cb60c27727e9a88b7e', 8041)"],
   'ensure-communicating-1651174261.556927',
   datetime.datetime(2022, 4, 28, 13, 33, 5, 778931)],
  ['missing-who-has',
   'tcp://10.124.227.26:35167',
   "('_split-bec215abf9d605cb60c27727e9a88b7e', 1126)",
   'ensure-communicating-1651174261.556927',
   datetime.datetime(2022, 4, 28, 13, 33, 5, 778938)],
  ["('_split-bec215abf9d605cb60c27727e9a88b7e', 1126)",
   'flight',
   'missing',
   'missing',
   {},
   'ensure-communicating-1651174261.556927',
   datetime.datetime(2022, 4, 28, 13, 33, 5, 779073)]]}

In #6112, we fixed gather_dep logic to transition all keys to missing that a particular worker holds, if communication with that worker fails. We assume the find_missing PeriodicCallback will talk to the scheduler and ask where to fetch them from next.

In the case of other errors, we would inform the scheduler of this:

elif ts not in recommendations:
ts.who_has.discard(worker)
self.has_what[worker].discard(ts.key)
self.log.append((d, "missing-dep", stimulus_id, time()))
self.batched_stream.send(
{
"op": "missing-data",
"errant_worker": worker,
"key": d,
"stimulus_id": stimulus_id,
}
)

However, in the OSError case, we don't send the missing-data message (because we've already added recommendations for those keys to missing).


Simultaneously, AMM is now used for retire_workers. When retiring a worker, we try to move all its keys onto other workers. To do this, AMM calls Scheduler.request_acquire_replicas:

def request_acquire_replicas(self, addr: str, keys: list, *, stimulus_id: str):
"""Asynchronously ask a worker to acquire a replica of the listed keys from
other workers. This is a fire-and-forget operation which offers no feedback for
success or failure, and is intended for housekeeping and not for computation.
"""
who_has = {}
for key in keys:
ts = self.tasks[key]
who_has[key] = {ws.address for ws in ts.who_has}
self.stream_comms[addr].send(
{
"op": "acquire-replicas",
"keys": keys,
"who_has": who_has,
"stimulus_id": stimulus_id,
},
)

request_acquire_replicas optimistically adds the key to the worker's who_has, before the worker has actually confirmed it's received it. So imagine this flow:

  • Worker A is retiring gracefully. It holds key-foo
  • AMM calls request_acquire_replicas("worker-b-addr", ["key-foo"]).
    • key-foo is immediately assigned to worker B from the scheduler's perspective
    • Scheduler tells worker B: "go fetch key-foo from worker A"
  • Next AMM cycle runs. AMM thinks, "both worker A and B hold key-foo now" (not actually true). "I'll tell worker A to drop its copy." request_remove_replicas also eagerly updates scheduler state so worker A no longer holds the key (this is also not true, but it's more reasonable).
  • Next AMM cycle runs. AMM thinks, "worker A holds no keys anymore. It can shut down"
  • Worker A shuts down
  • Worker B was busy, so it didn't get around to requesting key-foo from worker A until too late. Worker A is already shut down. The key is lost.
  • The OSError happens trying to fetch the key. The key goes to missing on worker B.
  • In find_missing, Worker B asks the scheduler, "who has key-foo"?
  • Scheduler replies, "just you do!"
  • Worker B: 🤔

I can't quite figure out what happens from here. I would have hoped that eventually worker B would try to gather_dep from itself, it would reply to itself "I don't have that key", and then it would finally send the missing-data signal to the scheduler, letting the scheduler realize the key is actually missing. That doesn't seem to be happening, from the cluster dump I'm looking at where this situation happened.

A few things to consider changing:

  • request_acquire_replicas should not be so optimistic, and assume the key is on a new worker before we've confirmed it's there. This creates a race condition, where we may drop the key from the sender before the receiver has gotten it. This means RetireWorker (and all AMM policies) may lead to lost data, when they shouldn't. cc @crusaderky
  • maybe the OSError handling logic should also send missing-data to the scheduler here-ish (not in a for-loop though)? cc @mrocklin
  • Worker.find_missing should definitely take action if the scheduler reports nobody (besides it) holds the key that's missing. That's a clear state mismatch, and we shouldn't assume anything else will resolve it. cc @fjetter @mrocklin
  • Include Worker._missing_dep_flight in cluster dumps #6243

I think maybe a change like this would be a good approach (not this doesn't plumb stimulus_id through in other calls to update_who_has, so it will fail as is. Also not sure if update_who_has is always the right place to do this.):

diff --git a/distributed/worker.py b/distributed/worker.py
index e5563ed0..04c6bfd6 100644
--- a/distributed/worker.py
+++ b/distributed/worker.py
@@ -3202,7 +3202,7 @@ class Worker(ServerNode):
                 keys=[ts.key for ts in self._missing_dep_flight],
             )
             who_has = {k: v for k, v in who_has.items() if v}
-            self.update_who_has(who_has)
+            self.update_who_has(who_has, stimulus_id=stimulus_id)
             recommendations: Recs = {}
             for ts in self._missing_dep_flight:
                 if ts.who_has:
@@ -3216,7 +3216,9 @@ class Worker(ServerNode):
             ].callback_time = self.periodic_callbacks["heartbeat"].callback_time
             self.ensure_communicating()
 
-    def update_who_has(self, who_has: dict[str, Collection[str]]) -> None:
+    def update_who_has(
+        self, who_has: dict[str, Collection[str]], *, stimulus_id: str
+    ) -> None:
         try:
             for dep, workers in who_has.items():
                 if not workers:
@@ -3232,6 +3234,21 @@ class Worker(ServerNode):
                         )
                         # Do not mutate the input dict. That's rude
                         workers = set(workers) - {self.address}
+                        if not workers:
+                            logger.warning(
+                                f"Scheduler claims this worker {self.address} is the only one holding {dep!r}, which is not true. "
+                                f"{dep!r} likely needs to be recomputed."
+                            )
+                            self.batched_stream.send(
+                                {
+                                    "op": "missing-data",
+                                    "errant_worker": self.address,
+                                    "key": dep,
+                                    "stimulus_id": stimulus_id,
+                                }
+                            )
+                            continue
+
                     dep_ts.who_has.update(workers)
 
                     for worker in workers:

Metadata

Metadata

Assignees

Labels

bugSomething is brokendeadlockThe cluster appears to not make any progress

Type

No type

Projects

No projects

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions