[HACK] Ordering to priorities "shuffle-split"#6051
Conversation
|
cc @eriknw
…On Tue, Mar 31, 2020 at 7:29 AM Mads R. B. Kristensen < ***@***.***> wrote:
This PR is a HACK to do *breadth first* ordering of shuffle-split tasks.
The scheduling policies of Dask is *depth-first* generally speaking,
which works great in most cases. However, it can increase the memory usage
significantly when we are splitting the output of a task into many small
task (like in rearrange_by_column_tasks()
<https://github.com/dask/dask/blob/fa63ce13ee1773d2042654a26a479bce932f292e/dask/dataframe/shuffle.py#L423>'s
shuffle-group and shuffle-split tasks). In this case *depth-first* delays
the freeing of shuffle-group until the end of the *shuffling*, which uses
much more memory than a *breadth first* ordering where all the
shuffle-split tasks are finished immediately and the output of
shuffle-group can be freed before continuing.
This is a HACK please don't merge.
cc. @rjzamora <https://github.com/rjzamora>, @beckernick
<https://github.com/beckernick>
------------------------------
You can view, comment on, or merge this pull request online at:
#6051
Commit Summary
- order(): added hack to priorities "shuffle-split"
File Changes
- *M* dask/order.py
<https://github.com/dask/dask/pull/6051/files#diff-d26fc8ea46375896e990e8f20828457f>
(9)
Patch Links:
- https://github.com/dask/dask/pull/6051.patch
- https://github.com/dask/dask/pull/6051.diff
—
You are receiving this because you are subscribed to this thread.
Reply to this email directly, view it on GitHub
<#6051>, or unsubscribe
<https://github.com/notifications/unsubscribe-auth/AACKZTCKNEOOTTIU5SLT7M3RKH46HANCNFSM4LXTEZLA>
.
|
|
I can confirm that this helps dramatically for our problematic use case. What is the path forward to officially enable an optional breadth-first execution? |
|
@madsbk can you share a code snippet that exercises this, and perhaps a a couple graphs of |
|
@TomAugspurger I'll provide a clean example in a gist as soon as I can |
|
@TomAugspurger do you expect this to change the task graph itself, or just the order of execution? Visually, these should be the same before and after, right? cc @rjzamora |
|
Just the order of execution. The |
|
The following example task graph visualizations come from the code snippet at the bottom. This snippet just creates a dataset and runs hash-based repartitioning. One is from this PR, one is from the current release (2.13). Apologies in advance that they use a GPU. This is the current execution order: This is the execution order on this PR: These were generated from the following code snippet: import sys
import dask_cudf
import cudf
filename = sys.argv[1]
df = cudf.datasets.randomdata(100000)
ddf = dask_cudf.from_cudf(df, 10)
ddf = ddf.repartition(columns="id")
ddf.visualize(
color="order", cmap="autumn", node_attr={"penwidth": "4"},
filename=f"{filename}"
) |
|
Thanks. Is In [1]: import dask.datasets
In [2]: ts = dask.datasets.timeseries()
In [3]: result = ts.set_index("id")We shouldn't need anything with a cluster / distributed, since this is the static ordering done prior to sending the task graph to the scheduler. Edit: the task graph from my example looks quite different, so it may not be representative of the original problem. |
|
Ah, right. Good point on not needing the cluster 😄 This operation is similar in nature to |
|
@TomAugspurger - A good dask-only reproducer is something like this: import dask.dataframe as dd
import pandas as pd
import numpy as np
size = 48
df = pd.DataFrame(
{
"index": np.random.choice(list(range(4)), size),
"a": np.arange(size),
}
)
ddf = dd.from_pandas(df, npartitions=4)
result = dd.shuffle.rearrange_by_divisions(
ddf, "index", (0, 1, 2, 3, 4), shuffle="tasks"
)Note that the |
dask/order.py
Outdated
| # TODO: Hack to priorities "shuffle-split" | ||
| shuffle_split_keys = [] | ||
| for k in result.keys(): | ||
| if len(k) > 0 and "shuffle-split" in k[0]: |
There was a problem hiding this comment.
What about using a general label that any API or user could add to the key of a task? For example, something like "dsk-prioritize":
# Prioritize tasks with "dsk-prioritize" annotation
for k in list(result.keys()):
if k and "dsk-prioritize" in k[0]:
result[k] = 0I know you mentioned off-line that the best general solution would be to annotate the priority within the task. Is this what you had in mind - or something more sophisticated? I guess the specific priority could also be included in the annotation if the user/api wants really fine-grained control.
There was a problem hiding this comment.
Yes, something like this but overloading key names is properly too much of a hack to be accepted :)
For reference, these graphs visualize as the following: The current execution: |
|
Opened an discussion of a better solution than this hack :) |
|
Thanks for raising @madsbk , and for producing images and reproducers @beckernick and @rjzamora . Whenever I come across a case where someone has made a particular workflow faster/better by working around Dask's scheduling heuristics I try to see if there is a way to generalize the improvement, rather than finding ways to make the workaround easier. Early on we had lots of these situations, and being disciplined about learning from special-case improvements and applying those lessons to the global heuristics made it so that our heuristics became decent over time (these sorts of situations are much less common today). To that end, I'm curious about how Dask's ordering got this wrong. I'd like to dig into this comment from @madsbk :
First, you might want to try #5872 by @eriknw , which changes around ordering. I'm not confident that it will resolve your problem here, but it should be easy to try. Second, is there anything we can learn from this so that Dask makes the right decision automatically, rather than requiring special input from the user? The ordering code is challenging to get into today, but it may be that @eriknw (who I believe is lurking on this thread) could help if we could find some improvement to make. |
|
Let me confirm what's happening here. The data from a task such as I don't think there is much In this example, are the tasks that create small data getters such as |
We totally agree @mrocklin - We don't want to implement/maintain any workarounds unless it proves completely necessary :)
@eriknw - Yes, the tasks returning "large" data, are actually taking in a single dataframe partition and then splitting the partition into a dictionary of smaller dataframes. The "shuffle-split" tasks are just calling |
|
I've continued to ponder a more general solution. It may not be unreasonable to identify and keep track of tasks that are expected to be much smaller than their dependencies. By knowing tiny tasks, both I don't have a strong opinion how this information is created and managed (such as from task annotations vs other). My PoC will probably accept a set of "tiny_keys". |
76a4898 to
e788c9f
Compare
30093bf to
888dcdb
Compare
888dcdb to
5ef58ca
Compare
|
I'm excited to see activity on this PR. Now that annotations are in I'm hopeful that this will be an easier win. |
Yeah, finally got around to look at this again :) |




This PR is a HACK to do breadth first ordering of
shuffle-splittasks.The scheduling policies of Dask is depth-first generally speaking, which works great in most cases. However, it can increase the memory usage significantly when we are splitting the output of a task into many small task (like in
rearrange_by_column_tasks()'sshuffle-groupandshuffle-splittasks). In this case depth-first delays the freeing ofshuffle-groupuntil the end of the shuffling, which uses much more memory than a breadth first ordering where all theshuffle-splittasks are finished immediately and the output ofshuffle-groupcan be freed before continuing.This is a HACK please don't merge, let's find a more general solution to this issue.
cc. @rjzamora, @beckernick