Skip to content

Redesign worker exponential backoff on busy-gather #6169

@crusaderky

Description

@crusaderky

When a worker receives a gather_dep request from another worker, but it is too busy with other transfers already, it will reply {"status": "busy"}. When this happens, there are three possible use cases:

  1. there are other workers that hold replicas of the same task, and the requesting worker knows about them
  2. there are other workers that hold replicas of the same task, but the requesting worker doesn't know. The worker needs to ask the scheduler through query_who_has.
  3. the busy worker holds the only replica of that task

worker.py implements an exponential backoff system, implemented in 2018 and never revisited (#2092) which, in theory, prevents it from

  • spamming the busy worker with consecutive gather requests
  • spamming the scheduler withquery_who_has

It is implemented as follows (pseudocode):

class Worker:
    repetitively_busy = 0
    data_needed = Heap()  # tasks in fetch status
    
    async def handle_scheduler(self):
        while True:
            await next_message_from_scheduler
            self.ensure_communicating()

    def ensure_communicating(self):
        while self.data_needed:
            ts = self.data_needed.pop()

            ws_local = [ws for ws in ts.who_has if same_host(ws, self)]
            ws_remote = [ws for ws in ts.who_has if not same_host(ws, self)]
            ws = random.choice(ws_local or ws_remote)

            self.transition_fetch_flight(ts)
            loop.add_callback(self.gather_dep, ws, ts)  # fire and forget task

    async def gather_dep(self, ws, ts):
        response = await get_data_from_worker(ws, ts)
        if response["status"] == "busy":
            self.transition_flight_fetch(ts)  # Add task back to self.data_needed
            # Exponential backoff to avoid hammering scheduler/worker
            self.repetitively_busy += 1
            await asyncio.sleep(0.100 * 1.5**self.repetitively_busy)
            await self.query_who_has(ts)
        else:
            self.repetitively_busy = 0
            # Omissis: deal with successful response and other failure use cases
        self.ensure_communicating()

If any other unrelated gather_dep completes or any message arrives from the scheduler, ensure_communicating will try fetching the task again. On a busy worker, this will happen almost instantly.
As there is no way to flag which worker was busy, if the busy worker was either the only local worker or the only worker on the whole cluster holding the data, it will be targeted again with get_data_from_worker.
This means that, almost instantly, the busy worker will reply the same thing and repetitively_busy will bump up a notch - unless a gather_dep from another worker recently terminated and the other worker was not busy.
It also means that there will be multiple identical gather_dep calls running in parallel, all waiting on the sleep.
query_who_has will kick off potentially several seconds later, potentially when it's no longer needed, e.g. because another iteration of ensure_communicating successfully fetched the task from another worker.

On the flip side, the worker may experience a burst of heavy chatter with the scheduler (but not other workers), followed by silence. For example, a bunch of tasks are submitted in a short timeframe, with just enough time between each so that they land in different messages, and now we need to wait for their execution to complete. This may cause repetitively_busy to rocket up to a very high number and sleep for - potentially - centuries before anything kicks off ensure_communicating again. DEADLOCK!

Finally, this interacts with select_keys_for_gather, which coagulates individually small fetches into the same message. If there are multiple tasks in fetch status pending and they're fairly large (target_message_size = 50MB), then a single ensure_communicating will spawn multiple contemporaneous gather_dep requests to the same worker, up to total_out_connections. They may all return busy, which in turn will increase repetitively_busy by several notches in a single go. Reducing target_message_size may result in a substantial slowdown, not because of the select_keys_for_gather system itself, but because repetitively_busy is reaching a higher number in a single iteration of ensure_communicating.

Besides its own shortcomings, this design is a blocker to #5896. The refactor removes the periodic kick to ensure_communicating from handle_scheduler, and instead kicks it off from transition_*_fetch. This means that the above will enter an infinite loop where the requesting worker starts accumulating more and more gather_dep tasks, and the busy worker is constantly hammered with get_data_from_worker. The sleep time quickly rises into the centuries.
A naive solution would be to move the sleep before self.transition_flight_fetch(ts). However, this would mean losing an unplanned, but highly desirable feature of the current design, which is if there are multiple workers holding replicas of the same task and the requesting worker is currently chatty, then another random worker will be tried straight away without waiting for the sleep.

Proposed design

  • Get rid of repetitively_busy.
  • Add to distributed.worker.WorkerState a busy: bool flag.
  • Worker changes as follows:
class Worker:
    data_needed = Heap()  # tasks in fetch status
    
    async def handle_scheduler(self):
        while True:
            await next_message_from_scheduler
            self.ensure_communicating()  # To be removed in #5896

    def ensure_communicating(self):
        busy_pushback = []
        while self.data_needed:
            ts = self.data_needed.pop()

            who_has = [ws for ws in ts.who_has if not ws.busy]
            if not who_has:
                busy_pushback.append(ts)
                continue

            ws_local = [ws for ws in who_has if same_host(ws, self)]
            ws_remote = [ws for ws in who_has if not same_host(ws, self)]
            ws = random.choice(ws_local or ws_remote)

            self.transition_fetch_flight(ts)
            loop.add_callback(self.gather_dep, ws, ts)  # fire and forget task

        for ts in busy_pushback:
            self.data_needed.push(ts)

    async def gather_dep(self, ws, ts):
        response = await get_data_from_worker(ws, ts)
        if response["status"] == "busy":
            ws.busy = True

            def reset_busy():
                ws.busy = False
                self.ensure_communicating()

            call_later(0.15, reset_busy)  # Hardcoded, not incremental

            self.transition_flight_fetch(ts)  # Add task back to self.data_needed
            self.ensure_communicating()
            if ts.status != "fetch":
                return  # fetching from another worker

            if all(ws2.busy for ws2 in ts.who_has):
                await self.query_who_has(ts)
                self.ensure_communicating()

The new algorithm works as follows:

  1. try fetching the task from a random, non-busy local worker
  2. if busy and there are non-busy local workers, goto 1
  3. if busy and all local workers are busy, try fetching the task from a random, non-busy remote worker
  4. if busy and there are non-busy remote workers, goto 3
  5. if at any time while trying remote workers a local worker becomes non-busy, goto 1
  6. if all local and remote workers are busy, immediately query_who_has the scheduler to discover more workers
  7. if the scheduler responds saying that previously unknown workers hold a replica, goto 1
  8. if all else fails, sleep 150ms and then goto 1

Note

If at least one local worker holds a replica, the old algorithm waits indefinitely for a local worker to become non-busy and completely ignores remote workers, thus avoiding the cost of network transfer but incurring in a delay. The new algorithm instead immediately falls back to remote workers.

Metadata

Metadata

Assignees

Labels

No labels
No labels

Type

No type

Projects

No projects

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions