-
-
Notifications
You must be signed in to change notification settings - Fork 757
Description
TL;DR I think we should build out our instrumentation around root-ish tasks to improve visibility, UX and enable further research in this space
When deciding on which worker to schedule a task on we're not treating all tasks equally.
Prior and ongoing work in this sector is the effort to enforce co-location and the effort to withhold tasks on the scheduler. Both cases single out a specific class of tasks and implements special scheduling heuristics for them. These issues are introducing something that is called a "root-ish task" which refers to nodes in the graph that are likely exhibiting a fan-out / reduce pattern. The reason why these need special treatment is an artifact of us assigning not yet runnable tasks (i.e. dependents) to workers just-in-time but are assigning ready tasks greedily. This can temporarily bypass the depth-first-search ordering of dask.order which causes suboptimal scheduling and significantly higher cluster-wide memory consuption for many use cases. This behavior is commonly referred to as root task overproduction.
There are commonly two approaches discussed to fix this problem
- Do not assign ready tasks (root-ish tasks specifically) greedily in Design and prototype for root-ish task deprioritization by withholding tasks on the scheduler #6560
- Do not assign not-ready (dependents) just-in-time but ahead-of-time; typically referred to as speculative task assignment
We are approaching consensus that both solutions would address this problem but they are taking almost orthogonal approaches to scheduling. Both solutions come with benefits, opportunities, shortcomings, costs and risks. The approach of task withholding is currently the most likely short term fix for the situation since it requries comparatively few adjustments to the code base and can be implemented scheduler-side only.
A common theme between both approaches is how we detect "special" tasks and I would like to start a conversation about generalizing the approach taken for root-tasks and discuss how this could be expanded.
How are root tasks detected
Root tasks can be detected with a quadratic runtime algorithm trivially by walking up and down the task graph but this is not feasible to perform for every task.
The current approach instead utilizes TaskGroups to infer the root task property
distributed/distributed/scheduler.py
Lines 1799 to 1806 in a1d2011
| # Group is larger than cluster with few dependencies? | |
| # Minimize future data transfers. | |
| if ( | |
| valid_workers is None | |
| and len(tg) > self.total_nthreads * 2 | |
| and len(tg.dependencies) < 5 | |
| and sum(map(len, tg.dependencies)) < 5 | |
| ): |
The dynamic component of this if clause (valid_workers is None and len(tg) > self.total_nthreads * 2) is there to protect us from making a bad scheduling decision that would reduce our ability to parallelize.
The static components are really what we use to classify the root tasks len(tg.dependencies) < 2 and sum(map(len, tg.dependencies)) < 5 which is a way of describing that we're dealing with a task that has "few, relatively small reducers".
This classification could be moved to become an actual TaskState property which has a couple of trivial benefits.
- We can do it once, e.g. after
update_graphor afterdask.order. While this is not a costly computation, there is no need to do this at runtime - Given this information is available all the time, we can visualize this information on the dashboard and/or other debugging/instrumentation tooling.
- This static information could be broadcast to workers (if there is need for it)
- Given that this is a static graph property, this would open an easy alleyway to unittest our task categorization logic, similar to how we nail down dask.order by providing specific examples.
- If this system is roughly formalized, task annotations (or a similar/parallel mechanism) could be used to inform scheduling decisions as well (up to building an entire dispatching mechanism)
How can we use this? What's the link to STA?
2.) will be very useful from a UX perspective. Our scheduling is already relatively opaque, if we now start to withhold some tasks because they are special, it would be nice if we could visualize this (e.g. as part of dask.order visualization, graph bokeh dashboard, annotations when hovering on the task stream).
From a developers perspective, I strongly believe that this is equally helpful to talk about less trivial (non-root) root-ish task graphs.
Apart from UX, I consider 2.), 4.) and to a lesser extend also 3.) as valueable components for further research in this space. For instance, are all commonly appearing "root-ish" tasks part of a same category of subtopologies or do they break up into further categories? Are there any topolgies for which we know that we can afford exact solutions[1]?
How valueable would it be to introduce manual annotations that mark certain tasks as special (For instance, data generators, high memory use, reducers, )? I'm sure there is more.
Apart from worker side state machine changes (which I consider quite doable after #5736) one of the big questions remaining in STA is "which tasks should we schedule ahead of time?"
As outlined above, I consider both the root task withholding and STA of dependents as a symmetrical problem when it comes to this decision. I believe further research into these task classifications could inform future decisions for both approaches.
[1] For example, think of trivial map-reduce/tree-reduce patterns. If we can detect that we're operating on these topologies it is almost trivial to implement perfect scheduling. Once we classified a task as root-ish we could further probe/analyze whether we are dealing with a trivial reduction and implement a special scheduling decision for this.
cc @gjoseph92