-
-
Notifications
You must be signed in to change notification settings - Fork 757
Description
When a worker receives a gather_dep request from another worker, but it is too busy with other transfers already, it will reply {"status": "busy"}. When this happens, there are three possible use cases:
- there are other workers that hold replicas of the same task, and the requesting worker knows about them
- there are other workers that hold replicas of the same task, but the requesting worker doesn't know. The worker needs to ask the scheduler through
query_who_has. - the busy worker holds the only replica of that task
worker.py implements an exponential backoff system, implemented in 2018 and never revisited (#2092) which, in theory, prevents it from
- spamming the busy worker with consecutive gather requests
- spamming the scheduler with
query_who_has
It is implemented as follows (pseudocode):
class Worker:
repetitively_busy = 0
data_needed = Heap() # tasks in fetch status
async def handle_scheduler(self):
while True:
await next_message_from_scheduler
self.ensure_communicating()
def ensure_communicating(self):
while self.data_needed:
ts = self.data_needed.pop()
ws_local = [ws for ws in ts.who_has if same_host(ws, self)]
ws_remote = [ws for ws in ts.who_has if not same_host(ws, self)]
ws = random.choice(ws_local or ws_remote)
self.transition_fetch_flight(ts)
loop.add_callback(self.gather_dep, ws, ts) # fire and forget task
async def gather_dep(self, ws, ts):
response = await get_data_from_worker(ws, ts)
if response["status"] == "busy":
self.transition_flight_fetch(ts) # Add task back to self.data_needed
# Exponential backoff to avoid hammering scheduler/worker
self.repetitively_busy += 1
await asyncio.sleep(0.100 * 1.5**self.repetitively_busy)
await self.query_who_has(ts)
else:
self.repetitively_busy = 0
# Omissis: deal with successful response and other failure use cases
self.ensure_communicating()If any other unrelated gather_dep completes or any message arrives from the scheduler, ensure_communicating will try fetching the task again. On a busy worker, this will happen almost instantly.
As there is no way to flag which worker was busy, if the busy worker was either the only local worker or the only worker on the whole cluster holding the data, it will be targeted again with get_data_from_worker.
This means that, almost instantly, the busy worker will reply the same thing and repetitively_busy will bump up a notch - unless a gather_dep from another worker recently terminated and the other worker was not busy.
It also means that there will be multiple identical gather_dep calls running in parallel, all waiting on the sleep.
query_who_has will kick off potentially several seconds later, potentially when it's no longer needed, e.g. because another iteration of ensure_communicating successfully fetched the task from another worker.
On the flip side, the worker may experience a burst of heavy chatter with the scheduler (but not other workers), followed by silence. For example, a bunch of tasks are submitted in a short timeframe, with just enough time between each so that they land in different messages, and now we need to wait for their execution to complete. This may cause repetitively_busy to rocket up to a very high number and sleep for - potentially - centuries before anything kicks off ensure_communicating again. DEADLOCK!
Finally, this interacts with select_keys_for_gather, which coagulates individually small fetches into the same message. If there are multiple tasks in fetch status pending and they're fairly large (target_message_size = 50MB), then a single ensure_communicating will spawn multiple contemporaneous gather_dep requests to the same worker, up to total_out_connections. They may all return busy, which in turn will increase repetitively_busy by several notches in a single go. Reducing target_message_size may result in a substantial slowdown, not because of the select_keys_for_gather system itself, but because repetitively_busy is reaching a higher number in a single iteration of ensure_communicating.
Besides its own shortcomings, this design is a blocker to #5896. The refactor removes the periodic kick to ensure_communicating from handle_scheduler, and instead kicks it off from transition_*_fetch. This means that the above will enter an infinite loop where the requesting worker starts accumulating more and more gather_dep tasks, and the busy worker is constantly hammered with get_data_from_worker. The sleep time quickly rises into the centuries.
A naive solution would be to move the sleep before self.transition_flight_fetch(ts). However, this would mean losing an unplanned, but highly desirable feature of the current design, which is if there are multiple workers holding replicas of the same task and the requesting worker is currently chatty, then another random worker will be tried straight away without waiting for the sleep.
Proposed design
- Get rid of repetitively_busy.
- Add to distributed.worker.WorkerState a
busy: boolflag. - Worker changes as follows:
class Worker:
data_needed = Heap() # tasks in fetch status
async def handle_scheduler(self):
while True:
await next_message_from_scheduler
self.ensure_communicating() # To be removed in #5896
def ensure_communicating(self):
busy_pushback = []
while self.data_needed:
ts = self.data_needed.pop()
who_has = [ws for ws in ts.who_has if not ws.busy]
if not who_has:
busy_pushback.append(ts)
continue
ws_local = [ws for ws in who_has if same_host(ws, self)]
ws_remote = [ws for ws in who_has if not same_host(ws, self)]
ws = random.choice(ws_local or ws_remote)
self.transition_fetch_flight(ts)
loop.add_callback(self.gather_dep, ws, ts) # fire and forget task
for ts in busy_pushback:
self.data_needed.push(ts)
async def gather_dep(self, ws, ts):
response = await get_data_from_worker(ws, ts)
if response["status"] == "busy":
ws.busy = True
def reset_busy():
ws.busy = False
self.ensure_communicating()
call_later(0.15, reset_busy) # Hardcoded, not incremental
self.transition_flight_fetch(ts) # Add task back to self.data_needed
self.ensure_communicating()
if ts.status != "fetch":
return # fetching from another worker
if all(ws2.busy for ws2 in ts.who_has):
await self.query_who_has(ts)
self.ensure_communicating()The new algorithm works as follows:
- try fetching the task from a random, non-busy local worker
- if busy and there are non-busy local workers, goto 1
- if busy and all local workers are busy, try fetching the task from a random, non-busy remote worker
- if busy and there are non-busy remote workers, goto 3
- if at any time while trying remote workers a local worker becomes non-busy, goto 1
- if all local and remote workers are busy, immediately query_who_has the scheduler to discover more workers
- if the scheduler responds saying that previously unknown workers hold a replica, goto 1
- if all else fails, sleep 150ms and then goto 1
Note
If at least one local worker holds a replica, the old algorithm waits indefinitely for a local worker to become non-busy and completely ignores remote workers, thus avoiding the cost of network transfer but incurring in a delay. The new algorithm instead immediately falls back to remote workers.