Automatically restart P2P shuffles when output worker leaves#7970
Automatically restart P2P shuffles when output worker leaves#7970hendrikmakait merged 88 commits intodask:mainfrom
Conversation
Unit Test ResultsSee test report for an extended history of previous test failures. This is useful for diagnosing flaky tests. 20 files ± 0 20 suites ±0 11h 35m 10s ⏱️ + 10h 14m 44s For more details on these failures, see this check. Results for commit 5764f45. ± Comparison against base commit efc7eeb. This pull request skips 1 and un-skips 138 tests.♻️ This comment has been updated with latest results. |
This reverts commit 853d953.
|
@wence-: Your feedback should be addressed and blocking PRs are merged, so this should be good for another round. |
phofl
left a comment
There was a problem hiding this comment.
Generally looks good to me, but not familiar enough with the code-base to be the final reviewer
fjetter
left a comment
There was a problem hiding this comment.
Most of my comments are nits and you can ignore or use them.
The one thing I'm concerned about is that the coverage report indicates some rather critical code sections are not covered. We should look into that before merging
| try: | ||
| shuffle = self.states[shuffle_id] | ||
| shuffle = self.active_shuffles[shuffle_id] | ||
| except KeyError: | ||
| return | ||
| self._fail_on_workers(shuffle, message=f"{shuffle} forgotten") | ||
| self._clean_on_scheduler(shuffle_id) | ||
| pass | ||
| else: | ||
| self._fail_on_workers(shuffle, message=f"{shuffle} forgotten") | ||
| self._clean_on_scheduler(shuffle_id, stimulus_id=stimulus_id) |
There was a problem hiding this comment.
This is more a style note and I typically try to avoid style questions in a PR review. Still, this feels a bit convoluted. I believe something like
if shuffle := self.active_shuffles.get(shuffle_id):
self._fail_on_workers(shuffle, message=f"{shuffle} forgotten")
self._clean_on_scheduler(shuffle_id, stimulus_id=stimulus_id)
elif finish == "forgotten":
...is easier to read than try/except;pass/else. with or without walrus.
There was a problem hiding this comment.
Feel free to ignore. Logic is the same in the end
There was a problem hiding this comment.
Fair point, this bit has gotten out of hand.
| def __eq__(self, other: Any) -> bool: | ||
| return type(other) == type(self) and other.run_id == self.run_id |
There was a problem hiding this comment.
This appears to be not covered by tests. Why is this necessary then?
There was a problem hiding this comment.
It had some use in an earlier iteration. Removed.
There was a problem hiding this comment.
My guess is because __hash__ is now not the default and this addition of __eq__ ensures that __eq__ and the newly defined __hash__ are consistent.
There was a problem hiding this comment.
I suppose this is because the run_id is a unique token that defines the shuffle state object.
There was a problem hiding this comment.
I suppose this is because the run_id is a unique token that defines the shuffle state object.
That's why I initially had a new __eq__. Then __hash__ had to match it. Now I'm only using __hash__, so I think there's no need for a custom __eq__ that could potentially get outdated.
distributed/shuffle/_shuffle.py
Outdated
| except ShuffleClosedError: | ||
| raise Reschedule() |
There was a problem hiding this comment.
This lack of coverage make me nervous. I think around the barrier there are various races we should test.
There was a problem hiding this comment.
I haven't been able to come up with a scenario where this would be triggered (and relevant), so I've removed it for now. If this ever pops up for somebody, I hope they'll send a bug report our way.
| self._clean_on_scheduler(shuffle_id, stimulus_id=stimulus_id) | ||
|
|
||
| if finish == "forgotten": | ||
| shuffles = self._shuffles.pop(shuffle_id) |
There was a problem hiding this comment.
IIUC this entire logic is just there to clean up. Behavior would not be impacted if we didn't do any of this, correct?
There was a problem hiding this comment.
Yes, this is state cleanup on the scheduler plugin.
| recs.update({dt.key: "released"}) | ||
|
|
||
| if barrier_task.state == "erred": | ||
| return {} # pragma: no cover |
There was a problem hiding this comment.
why don't you want coverage to detect this? Seems like an important case
There was a problem hiding this comment.
Added a comment to explain
There was a problem hiding this comment.
This seems like an ideal case for an assert False, "Invariant broken" ?
There was a problem hiding this comment.
That would also work. I'm wondering if assert False is the right thing to add here given that PYTHONOPTIMIZE will strip them. It would work as an addition though.
There was a problem hiding this comment.
raising a RuntimeError now
|
|
||
| for dt in barrier_task.dependencies: | ||
| if dt.state == "erred": | ||
| return {} # pragma: no cover |
There was a problem hiding this comment.
Added a comment to explain
| while self._runs: | ||
| await asyncio.sleep(0.1) |
There was a problem hiding this comment.
I don't insist on this but I don't like these sleep patterns.
def __init__(...):
self._runs_condition = asyncio.Condition()
async def _close_shuffle_run(self, shuffle: ShuffleRun) -> None:
await shuffle.close()
async with self._runs_condition:
self._runs.remove(shuffle)
self._runs_condition.notify_all()
async def teardown(self, worker: Worker) -> None:
...
async with self._runs_condition:
await self._runs_condition.wait_for(lambda: not self._runs)would be a clean alternative. Many people consider Conditions too complex but what I like about them is that they make this relationship very clear (and they unblock immediately which is nice for testing and such things).
As I said, I don't insist on this
There was a problem hiding this comment.
To a more serious question: Is it possible for _runs to be repopulated at this point or are we locking everything up properly for this to not happen once we reach this point?
There was a problem hiding this comment.
Makes sense! I've added the condition. At this point the plugin is closed which will raise a ShuffleClosedError before a new run can be added.
There was a problem hiding this comment.
_runs is added to in _refresh_shuffle which doesn't have a lock associated with it. But I am not sure if that can be running simultaneously with teardown.
There was a problem hiding this comment.
teardown sets
distributed/distributed/shuffle/_worker_plugin.py
Lines 890 to 893 in 90eb9ea
Once that is done,
distributed/distributed/shuffle/_worker_plugin.py
Lines 810 to 811 in 90eb9ea
| @gen_cluster( | ||
| client=True, | ||
| nthreads=[("", 1)] * 2, | ||
| config={"distributed.scheduler.allowed-failures": 0}, |
There was a problem hiding this comment.
does this mean that P2P is now retried allowed-failures times if a worker goes OOM?
There was a problem hiding this comment.
Wouldn't be a dealbreaker but I also don't think this is useful. It's very unlikely that another P2P run attempt would be more successful.
However, there are of course also cases like spot interruption where this matters... Never mind!
There was a problem hiding this comment.
does this mean that P2P is now retried allowed-failures times if a worker goes OOM?
Yes, as there might be other causes apart from an output partition being too large.
| out = dd.shuffle.shuffle(df, "x", shuffle="p2p") | ||
| out = out.persist() | ||
| x, y = c.compute([df.x.size, out.x.size]) | ||
| await wait_for_tasks_in_state("shuffle-p2p", "memory", 1, b) |
There was a problem hiding this comment.
I guess this is out of scope for this PR but I think it would make sense to have an API to easily get access to the actual shuffle instnaces held by the plugins and have a stage attribute that indicates whether we're in transfer, barrier or unpack stage.
I would find this kind of verification nicer than waiting for task states.
fjetter
left a comment
There was a problem hiding this comment.
Most of my comments are nits and you can ignore or use them.
The one thing I'm concerned about is that the coverage report indicates some rather critical code sections are not covered. We should look into that before merging
| def _create_shuffle_run( | ||
| self, shuffle_id: ShuffleId, result: dict[str, Any] | ||
| ) -> ShuffleRun: | ||
| shuffle: ShuffleRun | ||
| if result["type"] == ShuffleType.DATAFRAME: | ||
| shuffle = DataFrameShuffleRun( | ||
| column=result["column"], | ||
| worker_for=result["worker_for"], | ||
| output_workers=result["output_workers"], | ||
| id=shuffle_id, | ||
| run_id=result["run_id"], | ||
| directory=os.path.join( | ||
| self.worker.local_directory, | ||
| f"shuffle-{shuffle_id}-{result['run_id']}", | ||
| ), | ||
| executor=self._executor, | ||
| local_address=self.worker.address, | ||
| rpc=self.worker.rpc, | ||
| scheduler=self.worker.scheduler, | ||
| memory_limiter_disk=self.memory_limiter_disk, | ||
| memory_limiter_comms=self.memory_limiter_comms, | ||
| ) | ||
| shuffle = self._create_dataframe_shuffle_run(shuffle_id, result) | ||
| elif result["type"] == ShuffleType.ARRAY_RECHUNK: | ||
| shuffle = ArrayRechunkRun( | ||
| worker_for=result["worker_for"], | ||
| output_workers=result["output_workers"], | ||
| old=result["old"], | ||
| new=result["new"], | ||
| id=shuffle_id, | ||
| run_id=result["run_id"], | ||
| directory=os.path.join( | ||
| self.worker.local_directory, | ||
| f"shuffle-{shuffle_id}-{result['run_id']}", | ||
| ), | ||
| executor=self._executor, | ||
| local_address=self.worker.address, | ||
| rpc=self.worker.rpc, | ||
| scheduler=self.worker.scheduler, | ||
| memory_limiter_disk=self.memory_limiter_disk, | ||
| memory_limiter_comms=self.memory_limiter_comms, | ||
| ) | ||
| shuffle = self._create_array_rechunk_run(shuffle_id, result) | ||
| else: # pragma: no cover | ||
| raise TypeError(result["type"]) | ||
| self.shuffles[shuffle_id] = shuffle | ||
| self._runs.add(shuffle) | ||
| return shuffle | ||
|
|
||
| def _create_dataframe_shuffle_run( | ||
| self, shuffle_id: ShuffleId, result: dict[str, Any] | ||
| ) -> DataFrameShuffleRun: | ||
| return DataFrameShuffleRun( | ||
| column=result["column"], | ||
| worker_for=result["worker_for"], | ||
| output_workers=result["output_workers"], | ||
| id=shuffle_id, | ||
| run_id=result["run_id"], | ||
| directory=os.path.join( | ||
| self.worker.local_directory, | ||
| f"shuffle-{shuffle_id}-{result['run_id']}", | ||
| ), | ||
| executor=self._executor, | ||
| local_address=self.worker.address, | ||
| rpc=self.worker.rpc, | ||
| scheduler=self.worker.scheduler, | ||
| memory_limiter_disk=self.memory_limiter_disk, | ||
| memory_limiter_comms=self.memory_limiter_comms, | ||
| ) | ||
|
|
||
| def _create_array_rechunk_run( | ||
| self, shuffle_id: ShuffleId, result: dict[str, Any] | ||
| ) -> ArrayRechunkRun: | ||
| return ArrayRechunkRun( | ||
| worker_for=result["worker_for"], | ||
| output_workers=result["output_workers"], | ||
| old=result["old"], | ||
| new=result["new"], | ||
| id=shuffle_id, | ||
| run_id=result["run_id"], | ||
| directory=os.path.join( | ||
| self.worker.local_directory, | ||
| f"shuffle-{shuffle_id}-{result['run_id']}", | ||
| ), | ||
| executor=self._executor, | ||
| local_address=self.worker.address, | ||
| rpc=self.worker.rpc, | ||
| scheduler=self.worker.scheduler, | ||
| memory_limiter_disk=self.memory_limiter_disk, | ||
| memory_limiter_comms=self.memory_limiter_comms, | ||
| ) | ||
|
|
There was a problem hiding this comment.
Cosmetical refactoring to make it easier to understand whether we could potentially encounter races.
| def __eq__(self, other: Any) -> bool: | ||
| return type(other) == type(self) and other.run_id == self.run_id |
There was a problem hiding this comment.
My guess is because __hash__ is now not the default and this addition of __eq__ ensures that __eq__ and the newly defined __hash__ are consistent.
| def __eq__(self, other: Any) -> bool: | ||
| return type(other) == type(self) and other.run_id == self.run_id |
There was a problem hiding this comment.
I suppose this is because the run_id is a unique token that defines the shuffle state object.
| if worker not in self.scheduler.workers: | ||
| raise RuntimeError(f"Scheduler is unaware of this worker {worker!r}") |
There was a problem hiding this comment.
Can this be tested by retiring a worker during a shuffle in a test?
There was a problem hiding this comment.
I haven't been able to come up with a scenario where this would happen, but given how messy worker shutdown can be, I'm not 100% certain this would never happen. Left it in with a note for now.
| if worker not in shuffle.participating_workers: | ||
| continue |
There was a problem hiding this comment.
Test by adding a worker to the cluster and then restarting a shuffle?
| continue | ||
|
|
||
| stimulus_id = f"shuffle-failed-worker-left-{time()}" | ||
| self._restart_shuffle(shuffle.id, scheduler, stimulus_id=stimulus_id) |
There was a problem hiding this comment.
OK, so first we restart all shuffles that were interrupted.
There was a problem hiding this comment.
I think that restarting this shuffle should remove it from _archived_by but I do not see that happening. Do I have that right? Or does this somehow create a new shuffle object that has archived_by = None. Otherwise it seems like it might get lost in _clean_on_scheduler.
There was a problem hiding this comment.
Restarting a shuffle removes the ShuffleState from active states. The first shuffle_transfer task to call shuffle_get_or_create will cause the SchedulerPlugin to create a new ShuffleState with an incremented run_id and _archived_by = None.
| # If processing the transactions causes a task to get released, this | ||
| # removes the shuffle from self.active_shuffles. Therefore, we must iterate | ||
| # over a copy. | ||
| for shuffle_id, shuffle in self.active_shuffles.copy().items(): |
There was a problem hiding this comment.
Then we iterate over all active shuffles, remove and restart?
Why do we not unconditionally restart the archived shuffles after this loop over active shuffles?
There was a problem hiding this comment.
I'm not 100% sure I'm following, but what I think you're saying is a very good point.
| while self._runs: | ||
| await asyncio.sleep(0.1) |
There was a problem hiding this comment.
_runs is added to in _refresh_shuffle which doesn't have a lock associated with it. But I am not sure if that can be running simultaneously with teardown.
| recs.update({dt.key: "released"}) | ||
|
|
||
| if barrier_task.state == "erred": | ||
| return {} # pragma: no cover |
There was a problem hiding this comment.
This seems like an ideal case for an assert False, "Invariant broken" ?
|
|
||
| for dt in barrier_task.dependencies: | ||
| if dt.state == "erred": | ||
| return {} # pragma: no cover |
| async with self._runs_condition: | ||
| await self._runs_condition.wait_for(lambda: not self._runs) |
There was a problem hiding this comment.
This lock protects _runs wrt _close_shuffle_run but not wrt _refresh_shuffle I think.
There was a problem hiding this comment.
I've renamed it to _runs_cleanup_condition to highlight that it's only concerned with cleanup. There's a different mechanism in place for adding to self._runs. (Feel free to refactor in a follow-up if you see a good way of doing so.)
|
I've added another test, now all feedback should be addressed. For |
wence-
left a comment
There was a problem hiding this comment.
To the best of my understanding, this looks right!
Closes #7353
Blocked by and includes #7967Blocked by and includes #7979Blocked by and includes #7981Blocked by and includes #7974pre-commit run --all-files