Skip to content

Factor out and instrument task categorization logic - static graph analysis #6922

@fjetter

Description

@fjetter

TL;DR I think we should build out our instrumentation around root-ish tasks to improve visibility, UX and enable further research in this space


When deciding on which worker to schedule a task on we're not treating all tasks equally.

Prior and ongoing work in this sector is the effort to enforce co-location and the effort to withhold tasks on the scheduler. Both cases single out a specific class of tasks and implements special scheduling heuristics for them. These issues are introducing something that is called a "root-ish task" which refers to nodes in the graph that are likely exhibiting a fan-out / reduce pattern. The reason why these need special treatment is an artifact of us assigning not yet runnable tasks (i.e. dependents) to workers just-in-time but are assigning ready tasks greedily. This can temporarily bypass the depth-first-search ordering of dask.order which causes suboptimal scheduling and significantly higher cluster-wide memory consuption for many use cases. This behavior is commonly referred to as root task overproduction.

There are commonly two approaches discussed to fix this problem

We are approaching consensus that both solutions would address this problem but they are taking almost orthogonal approaches to scheduling. Both solutions come with benefits, opportunities, shortcomings, costs and risks. The approach of task withholding is currently the most likely short term fix for the situation since it requries comparatively few adjustments to the code base and can be implemented scheduler-side only.

A common theme between both approaches is how we detect "special" tasks and I would like to start a conversation about generalizing the approach taken for root-tasks and discuss how this could be expanded.

How are root tasks detected

Root tasks can be detected with a quadratic runtime algorithm trivially by walking up and down the task graph but this is not feasible to perform for every task.

The current approach instead utilizes TaskGroups to infer the root task property

# Group is larger than cluster with few dependencies?
# Minimize future data transfers.
if (
valid_workers is None
and len(tg) > self.total_nthreads * 2
and len(tg.dependencies) < 5
and sum(map(len, tg.dependencies)) < 5
):

The dynamic component of this if clause (valid_workers is None and len(tg) > self.total_nthreads * 2) is there to protect us from making a bad scheduling decision that would reduce our ability to parallelize.
The static components are really what we use to classify the root tasks len(tg.dependencies) < 2 and sum(map(len, tg.dependencies)) < 5 which is a way of describing that we're dealing with a task that has "few, relatively small reducers".
This classification could be moved to become an actual TaskState property which has a couple of trivial benefits.

  1. We can do it once, e.g. after update_graph or after dask.order. While this is not a costly computation, there is no need to do this at runtime
  2. Given this information is available all the time, we can visualize this information on the dashboard and/or other debugging/instrumentation tooling.
  3. This static information could be broadcast to workers (if there is need for it)
  4. Given that this is a static graph property, this would open an easy alleyway to unittest our task categorization logic, similar to how we nail down dask.order by providing specific examples.
  5. If this system is roughly formalized, task annotations (or a similar/parallel mechanism) could be used to inform scheduling decisions as well (up to building an entire dispatching mechanism)

How can we use this? What's the link to STA?

2.) will be very useful from a UX perspective. Our scheduling is already relatively opaque, if we now start to withhold some tasks because they are special, it would be nice if we could visualize this (e.g. as part of dask.order visualization, graph bokeh dashboard, annotations when hovering on the task stream).

From a developers perspective, I strongly believe that this is equally helpful to talk about less trivial (non-root) root-ish task graphs.

Apart from UX, I consider 2.), 4.) and to a lesser extend also 3.) as valueable components for further research in this space. For instance, are all commonly appearing "root-ish" tasks part of a same category of subtopologies or do they break up into further categories? Are there any topolgies for which we know that we can afford exact solutions[1]?
How valueable would it be to introduce manual annotations that mark certain tasks as special (For instance, data generators, high memory use, reducers, )? I'm sure there is more.

Apart from worker side state machine changes (which I consider quite doable after #5736) one of the big questions remaining in STA is "which tasks should we schedule ahead of time?"
As outlined above, I consider both the root task withholding and STA of dependents as a symmetrical problem when it comes to this decision. I believe further research into these task classifications could inform future decisions for both approaches.

[1] For example, think of trivial map-reduce/tree-reduce patterns. If we can detect that we're operating on these topologies it is almost trivial to implement perfect scheduling. Once we classified a task as root-ish we could further probe/analyze whether we are dealing with a trivial reduction and implement a special scheduling decision for this.

cc @gjoseph92

Metadata

Metadata

Assignees

No one assigned

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions