-
-
Notifications
You must be signed in to change notification settings - Fork 756
Description
The current implementation of the worker ensure_communicating will continue to fetch dependencies for as long as there are dependencies to fetch. This can push an already overloaded worker over the edge and cause it to fail.
distributed/distributed/worker.py
Lines 2684 to 2687 in 8734c9d
| while self.data_needed and ( | |
| len(self.in_flight_workers) < self.total_out_connections | |
| or self.comm_nbytes < self.comm_threshold_bytes | |
| ): |
A paused worker should not be allowed to fetch more data.
There are two possible ways to achieve this
- Add another guard to
ensure_communicatingto stop scheduling additionalgather_depcoroutines - Remove all tasks from a paused worker that aren't in memory. This would indirectly empty the
data_neededheap and cause a worker to stabilize. This could be achieved by either aggressively stealing or by implementing a custom scheduler handler.
I think both options have a certain appeal. I'm wondering which one is the best to choose, specifically in context of the latest changes to AMM / retirement / pause.
cc @crusaderky
Note: right now, network traffic is only restricted for egress, i.e. incoming get_data requests from other workers, see
distributed/distributed/worker.py
Lines 1712 to 1716 in 8734c9d
| if self.status == Status.paused: | |
| max_connections = 1 | |
| throttle_msg = " Throttling outgoing connections because worker is paused." | |
| else: | |
| throttle_msg = "" |