Skip to content

[FEA] Dynamic Task Graph / Task Checkpointing #3811

@madsbk

Description

@madsbk

TL;DR: By introducing task checkpointing where a running task can update its state on the scheduler, it is possible to reduce scheduler overhead, support long running tasks, and uses explicit worker-to-worker communication while maintaining resilient.

Motivation

As discussed in many issues and PRs (e.g. #3783, #854, #3139, dask/dask#6163), the scheduler overhead of Dask/Distribued can be a problem as the number of tasks increases. Many proposals involves optimizing the Python code through PyPy, Cython, Rust, or some other tool/language.

This PR propose an orthogonal approach that reduces the number of tasks and make it possible to encapsulate domain knowledge of specific operations into tasks -- such as minimizing memory use, overlapping computation and communication, etc.

Related Approaches

Current Task Workflow

All tasks go through the follow flow:

**Client**  
  1. Graph creation  
  2. Graph optimization 
  3. Serialize graph 
  4. Send graph to scheduler 
**Scheduler** 
  5. Update graph 
  6. Send tasks, one at a time, to workers 
**Worker**  
  7. Execute tasks

Task Fusion

All tasks go through steps 1 to 4 but by fusing tasks (potential into SubgraphCallable) only a reduced graph goes through step 5 and 6, which can significantly easy the load on the scheduler. However, fusing tasks also limits the available parallelism thus it has its limits.

Task Generation

At graph creation, we use task generators to reduce the size of the graph. Particularly, in operations such as shuffle() that consist of up to n**2 number of tasks. This means that only steps 3 to 7 encounter all tasks. And if we allow the scheduler to execute python code, we can extend this to steps 5 to 7.

Submit Tasks from Tasks

Instead of implementing expensive operations such as shuffle() in a task graph, we can use few long running jobs that use direct worker-to-worker communicate to bypass the scheduler altogether. This approach is very performance efficient but also has two major drawbacks:

  • It provides no resilient, if a worker disconnects unexpected the states of the long running jobs are all lost.
  • In cases such as shuffle(), this approach requires extra memory because the inputs to the long running jobs must be in-memory until the jobs completes. Something that can be an absolute deal breaker [HACK] Ordering to priorities "shuffle-split" dask#6051.

Proposed Approach

Dynamic Task Graph / Task Checkpointing

At graph creation, we use dynamic tasks to reduce the size of the graph and encapsulate domain knowledge of specific operations. This means that only step 7 encounters all tasks.

Dynamic tasks are regular tasks that are optimized, scheduled, and executed on workers as regular tasks. It is only when they use checkpointing that they differ. The following is the logic flow when a running task calls checkpointing:

  1. A task running on a worker sends a task update to the scheduler that contains:
    • New keys that is now in-memory on the worker
    • New keys that the task now depend on
    • Existing keys that the task doesn’t depend on anymore
    • A new task (function & key/literal arguments) that replaces the existing task.
  2. The scheduler updates relevant TaskStates and release keys that no one depend on anymore.
  3. If all dependencies are satisfied, the task can now be rescheduled from its new state. If not, the task transits to the waiting state.

Any thoughts? Is it something I should begin implementing?

cc. @mrocklin, @quasiben, @rjzamora, @jakirkham

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions