-
-
Notifications
You must be signed in to change notification settings - Fork 756
Description
This is somewhat conjecture based on a partial cluster dump from @bnaul (full one wasn't working for some reason). Hopefully we can attach the dump at some point so others can see. I only got to see the dump of a worker. If we could see the scheduler state, we could confirm this theory.
This was the worker story of the bad key
In [9]: dump.worker_stories("('_split-bec215abf9d605cb60c27727e9a88b7e', 1126)")
Out[9]:
{'tcp://10.125.88.48:37099': [["('_split-bec215abf9d605cb60c27727e9a88b7e', 1126)",
'ensure-task-exists',
'released',
'active_memory_manager-1651174357.5785856',
datetime.datetime(2022, 4, 28, 13, 32, 37, 586432)],
["('_split-bec215abf9d605cb60c27727e9a88b7e', 1126)",
'released',
'fetch',
'fetch',
{},
'active_memory_manager-1651174357.5785856',
datetime.datetime(2022, 4, 28, 13, 32, 37, 586584)],
["('_split-bec215abf9d605cb60c27727e9a88b7e', 1126)",
'compute-task',
'compute-task-1651174375.3724043',
datetime.datetime(2022, 4, 28, 13, 32, 55, 378109)],
["('_split-bec215abf9d605cb60c27727e9a88b7e', 1126)",
'release-key',
'compute-task-1651174375.3724043',
datetime.datetime(2022, 4, 28, 13, 32, 55, 378176)],
["('_split-bec215abf9d605cb60c27727e9a88b7e', 1126)",
'fetch',
'released',
'released',
{"('_split-bec215abf9d605cb60c27727e9a88b7e', 1126)": 'forgotten'},
'compute-task-1651174375.3724043',
datetime.datetime(2022, 4, 28, 13, 32, 55, 378205)],
["('_split-bec215abf9d605cb60c27727e9a88b7e', 1126)",
'released',
'forgotten',
'forgotten',
{},
'compute-task-1651174375.3724043',
datetime.datetime(2022, 4, 28, 13, 32, 55, 378212)],
["('_split-bec215abf9d605cb60c27727e9a88b7e', 1126)",
'fetch',
'waiting',
'forgotten',
{"('_split-bec215abf9d605cb60c27727e9a88b7e', 1126)": 'forgotten'},
'compute-task-1651174375.3724043',
datetime.datetime(2022, 4, 28, 13, 32, 55, 378214)]],
'tcp://10.124.15.5:35871': [["('_split-bec215abf9d605cb60c27727e9a88b7e', 1126)",
'ensure-task-exists',
'released',
'active_memory_manager-1651174147.6869376',
datetime.datetime(2022, 4, 28, 13, 29, 7, 714652)],
["('_split-bec215abf9d605cb60c27727e9a88b7e', 1126)",
'released',
'fetch',
'fetch',
{},
'active_memory_manager-1651174147.6869376',
datetime.datetime(2022, 4, 28, 13, 29, 7, 714735)],
["('_split-bec215abf9d605cb60c27727e9a88b7e', 1126)",
'ensure-task-exists',
'fetch',
'active_memory_manager-1651174149.5793703',
datetime.datetime(2022, 4, 28, 13, 29, 9, 587408)],
['receive-dep-failed',
'tcp://10.124.227.26:35167',
["('_split-bec215abf9d605cb60c27727e9a88b7e', 1126)",
"('read-parquet-4eae21bcb0e0252ce122c72c6dfb099a', 2764)",
"('_split-bec215abf9d605cb60c27727e9a88b7e', 5726)",
"('_split-bec215abf9d605cb60c27727e9a88b7e', 9555)",
"('read-parquet-4eae21bcb0e0252ce122c72c6dfb099a', 7277)",
"('read-parquet-4eae21bcb0e0252ce122c72c6dfb099a', 257)",
"('_split-bec215abf9d605cb60c27727e9a88b7e', 8041)",
"('_split-bec215abf9d605cb60c27727e9a88b7e', 2732)"],
'ensure-communicating-1651174136.9395597',
datetime.datetime(2022, 4, 28, 13, 31, 0, 999395)],
['missing-who-has',
'tcp://10.124.227.26:35167',
"('_split-bec215abf9d605cb60c27727e9a88b7e', 1126)",
'ensure-communicating-1651174136.9395597',
datetime.datetime(2022, 4, 28, 13, 31, 0, 999403)],
["('_split-bec215abf9d605cb60c27727e9a88b7e', 1126)",
'fetch',
'missing',
'missing',
{},
'ensure-communicating-1651174136.9395597',
datetime.datetime(2022, 4, 28, 13, 31, 0, 999475)],
["('_split-bec215abf9d605cb60c27727e9a88b7e', 1126)",
'missing',
'fetch',
'fetch',
{},
'find-missing-1651174261.5426853',
datetime.datetime(2022, 4, 28, 13, 31, 1, 556907)],
['gather-dependencies',
'tcp://10.124.227.26:35167',
["('_split-bec215abf9d605cb60c27727e9a88b7e', 1126)",
"('read-parquet-4eae21bcb0e0252ce122c72c6dfb099a', 2764)",
"('_split-bec215abf9d605cb60c27727e9a88b7e', 5726)",
"('_split-bec215abf9d605cb60c27727e9a88b7e', 9555)",
"('read-parquet-4eae21bcb0e0252ce122c72c6dfb099a', 7277)",
"('read-parquet-4eae21bcb0e0252ce122c72c6dfb099a', 257)",
"('_split-bec215abf9d605cb60c27727e9a88b7e', 8041)",
"('_split-bec215abf9d605cb60c27727e9a88b7e', 2732)"],
'ensure-communicating-1651174261.556927',
datetime.datetime(2022, 4, 28, 13, 31, 1, 557029)],
["('_split-bec215abf9d605cb60c27727e9a88b7e', 1126)",
'fetch',
'flight',
'flight',
{},
'ensure-communicating-1651174261.556927',
datetime.datetime(2022, 4, 28, 13, 31, 1, 557074)],
['request-dep',
'tcp://10.124.227.26:35167',
["('_split-bec215abf9d605cb60c27727e9a88b7e', 1126)",
"('_split-bec215abf9d605cb60c27727e9a88b7e', 5726)",
"('_split-bec215abf9d605cb60c27727e9a88b7e', 9555)",
"('read-parquet-4eae21bcb0e0252ce122c72c6dfb099a', 7277)",
"('read-parquet-4eae21bcb0e0252ce122c72c6dfb099a', 257)",
"('read-parquet-4eae21bcb0e0252ce122c72c6dfb099a', 2764)",
"('_split-bec215abf9d605cb60c27727e9a88b7e', 2732)",
"('_split-bec215abf9d605cb60c27727e9a88b7e', 8041)"],
'ensure-communicating-1651174261.556927',
datetime.datetime(2022, 4, 28, 13, 31, 1, 557221)],
['receive-dep-failed',
'tcp://10.124.227.26:35167',
["('_split-bec215abf9d605cb60c27727e9a88b7e', 1126)",
"('_split-bec215abf9d605cb60c27727e9a88b7e', 5726)",
"('_split-bec215abf9d605cb60c27727e9a88b7e', 9555)",
"('read-parquet-4eae21bcb0e0252ce122c72c6dfb099a', 7277)",
"('read-parquet-4eae21bcb0e0252ce122c72c6dfb099a', 257)",
"('read-parquet-4eae21bcb0e0252ce122c72c6dfb099a', 2764)",
"('_split-bec215abf9d605cb60c27727e9a88b7e', 2732)",
"('_split-bec215abf9d605cb60c27727e9a88b7e', 8041)"],
'ensure-communicating-1651174261.556927',
datetime.datetime(2022, 4, 28, 13, 33, 5, 778931)],
['missing-who-has',
'tcp://10.124.227.26:35167',
"('_split-bec215abf9d605cb60c27727e9a88b7e', 1126)",
'ensure-communicating-1651174261.556927',
datetime.datetime(2022, 4, 28, 13, 33, 5, 778938)],
["('_split-bec215abf9d605cb60c27727e9a88b7e', 1126)",
'flight',
'missing',
'missing',
{},
'ensure-communicating-1651174261.556927',
datetime.datetime(2022, 4, 28, 13, 33, 5, 779073)]]}In #6112, we fixed gather_dep logic to transition all keys to missing that a particular worker holds, if communication with that worker fails. We assume the find_missing PeriodicCallback will talk to the scheduler and ask where to fetch them from next.
In the case of other errors, we would inform the scheduler of this:
distributed/distributed/worker.py
Lines 3156 to 3167 in b837003
| elif ts not in recommendations: | |
| ts.who_has.discard(worker) | |
| self.has_what[worker].discard(ts.key) | |
| self.log.append((d, "missing-dep", stimulus_id, time())) | |
| self.batched_stream.send( | |
| { | |
| "op": "missing-data", | |
| "errant_worker": worker, | |
| "key": d, | |
| "stimulus_id": stimulus_id, | |
| } | |
| ) |
However, in the OSError case, we don't send the missing-data message (because we've already added recommendations for those keys to missing).
Simultaneously, AMM is now used for retire_workers. When retiring a worker, we try to move all its keys onto other workers. To do this, AMM calls Scheduler.request_acquire_replicas:
distributed/distributed/scheduler.py
Lines 7209 to 7226 in b837003
| def request_acquire_replicas(self, addr: str, keys: list, *, stimulus_id: str): | |
| """Asynchronously ask a worker to acquire a replica of the listed keys from | |
| other workers. This is a fire-and-forget operation which offers no feedback for | |
| success or failure, and is intended for housekeeping and not for computation. | |
| """ | |
| who_has = {} | |
| for key in keys: | |
| ts = self.tasks[key] | |
| who_has[key] = {ws.address for ws in ts.who_has} | |
| self.stream_comms[addr].send( | |
| { | |
| "op": "acquire-replicas", | |
| "keys": keys, | |
| "who_has": who_has, | |
| "stimulus_id": stimulus_id, | |
| }, | |
| ) |
request_acquire_replicas optimistically adds the key to the worker's who_has, before the worker has actually confirmed it's received it. So imagine this flow:
- Worker A is retiring gracefully. It holds
key-foo - AMM calls
request_acquire_replicas("worker-b-addr", ["key-foo"]).key-foois immediately assigned to worker B from the scheduler's perspective- Scheduler tells worker B: "go fetch
key-foofrom worker A"
- Next AMM cycle runs. AMM thinks, "both worker A and B hold
key-foonow" (not actually true). "I'll tell worker A to drop its copy."request_remove_replicasalso eagerly updates scheduler state so worker A no longer holds the key (this is also not true, but it's more reasonable). - Next AMM cycle runs. AMM thinks, "worker A holds no keys anymore. It can shut down"
- Worker A shuts down
- Worker B was busy, so it didn't get around to requesting
key-foofrom worker A until too late. Worker A is already shut down. The key is lost. - The
OSErrorhappens trying to fetch the key. The key goes tomissingon worker B. - In
find_missing, Worker B asks the scheduler, "who haskey-foo"? - Scheduler replies, "just you do!"
- Worker B: 🤔
I can't quite figure out what happens from here. I would have hoped that eventually worker B would try to gather_dep from itself, it would reply to itself "I don't have that key", and then it would finally send the missing-data signal to the scheduler, letting the scheduler realize the key is actually missing. That doesn't seem to be happening, from the cluster dump I'm looking at where this situation happened.
A few things to consider changing:
request_acquire_replicasshould not be so optimistic, and assume the key is on a new worker before we've confirmed it's there. This creates a race condition, where we may drop the key from the sender before the receiver has gotten it. This meansRetireWorker(and all AMM policies) may lead to lost data, when they shouldn't. cc @crusaderky- maybe the OSError handling logic should also send
missing-datato the scheduler here-ish (not in a for-loop though)? cc @mrocklin Worker.find_missingshould definitely take action if the scheduler reports nobody (besides it) holds the key that's missing. That's a clear state mismatch, and we shouldn't assume anything else will resolve it. cc @fjetter @mrocklin- Include
Worker._missing_dep_flightin cluster dumps #6243
I think maybe a change like this would be a good approach (not this doesn't plumb stimulus_id through in other calls to update_who_has, so it will fail as is. Also not sure if update_who_has is always the right place to do this.):
diff --git a/distributed/worker.py b/distributed/worker.py
index e5563ed0..04c6bfd6 100644
--- a/distributed/worker.py
+++ b/distributed/worker.py
@@ -3202,7 +3202,7 @@ class Worker(ServerNode):
keys=[ts.key for ts in self._missing_dep_flight],
)
who_has = {k: v for k, v in who_has.items() if v}
- self.update_who_has(who_has)
+ self.update_who_has(who_has, stimulus_id=stimulus_id)
recommendations: Recs = {}
for ts in self._missing_dep_flight:
if ts.who_has:
@@ -3216,7 +3216,9 @@ class Worker(ServerNode):
].callback_time = self.periodic_callbacks["heartbeat"].callback_time
self.ensure_communicating()
- def update_who_has(self, who_has: dict[str, Collection[str]]) -> None:
+ def update_who_has(
+ self, who_has: dict[str, Collection[str]], *, stimulus_id: str
+ ) -> None:
try:
for dep, workers in who_has.items():
if not workers:
@@ -3232,6 +3234,21 @@ class Worker(ServerNode):
)
# Do not mutate the input dict. That's rude
workers = set(workers) - {self.address}
+ if not workers:
+ logger.warning(
+ f"Scheduler claims this worker {self.address} is the only one holding {dep!r}, which is not true. "
+ f"{dep!r} likely needs to be recomputed."
+ )
+ self.batched_stream.send(
+ {
+ "op": "missing-data",
+ "errant_worker": self.address,
+ "key": dep,
+ "stimulus_id": stimulus_id,
+ }
+ )
+ continue
+
dep_ts.who_has.update(workers)
for worker in workers: