Skip to content

[HACK] Ordering to priorities "shuffle-split"#6051

Closed
madsbk wants to merge 2 commits intodask:mainfrom
madsbk:shuffle_split_high_priority
Closed

[HACK] Ordering to priorities "shuffle-split"#6051
madsbk wants to merge 2 commits intodask:mainfrom
madsbk:shuffle_split_high_priority

Conversation

@madsbk
Copy link
Contributor

@madsbk madsbk commented Mar 31, 2020

This PR is a HACK to do breadth first ordering of shuffle-split tasks.

The scheduling policies of Dask is depth-first generally speaking, which works great in most cases. However, it can increase the memory usage significantly when we are splitting the output of a task into many small task (like in rearrange_by_column_tasks()'s shuffle-group and shuffle-split tasks). In this case depth-first delays the freeing of shuffle-group until the end of the shuffling, which uses much more memory than a breadth first ordering where all the shuffle-split tasks are finished immediately and the output of shuffle-group can be freed before continuing.

This is a HACK please don't merge, let's find a more general solution to this issue.

cc. @rjzamora, @beckernick

@mrocklin
Copy link
Member

mrocklin commented Mar 31, 2020 via email

@rjzamora
Copy link
Member

I can confirm that this helps dramatically for our problematic use case. What is the path forward to officially enable an optional breadth-first execution?

@TomAugspurger
Copy link
Member

@madsbk can you share a code snippet that exercises this, and perhaps a a couple graphs of visualize(color="order", cmap="autumn", node_attr={"penwidth": "4"}) with the before & after?

@beckernick
Copy link
Member

beckernick commented Mar 31, 2020

@TomAugspurger I'll provide a clean example in a gist as soon as I can

@beckernick
Copy link
Member

beckernick commented Mar 31, 2020

@TomAugspurger do you expect this to change the task graph itself, or just the order of execution? Visually, these should be the same before and after, right? cc @rjzamora

@TomAugspurger
Copy link
Member

Just the order of execution. The color="order" is what would change the visual representation.

@beckernick
Copy link
Member

beckernick commented Mar 31, 2020

The following example task graph visualizations come from the code snippet at the bottom. This snippet just creates a dataset and runs hash-based repartitioning. One is from this PR, one is from the current release (2.13). Apologies in advance that they use a GPU.

This is the current execution order:

current_dask_graph

This is the execution order on this PR:

new_dask_graph

These were generated from the following code snippet:

import sys

import dask_cudf
import cudf

filename = sys.argv[1]

df = cudf.datasets.randomdata(100000)
ddf = dask_cudf.from_cudf(df, 10)

ddf = ddf.repartition(columns="id")

ddf.visualize(
    color="order", cmap="autumn", node_attr={"penwidth": "4"},
    filename=f"{filename}"
)

cc @TomAugspurger @madsbk @rjzamora @mrocklin

@TomAugspurger
Copy link
Member

TomAugspurger commented Mar 31, 2020

Thanks. Is dask_cudf.repartition similar to dask.dataframe.DataFrame.set_index? If so, this may be a reproducer with just Dask

In [1]: import dask.datasets

In [2]: ts = dask.datasets.timeseries()

In [3]: result = ts.set_index("id")

We shouldn't need anything with a cluster / distributed, since this is the static ordering done prior to sending the task graph to the scheduler.

Edit: the task graph from my example looks quite different, so it may not be representative of the original problem.

@beckernick
Copy link
Member

Ah, right. Good point on not needing the cluster 😄

This operation is similar in nature to set_index, but we don't end up with a difference due to the lack of an explicit shuffle-split task (this PR is only looking for that string in a task. The set_index op has shuffle-collect tasks.

@rjzamora
Copy link
Member

@TomAugspurger - A good dask-only reproducer is something like this:

import dask.dataframe as dd
import pandas as pd
import numpy as np

size = 48
df = pd.DataFrame(
    {
        "index": np.random.choice(list(range(4)), size),
        "a": np.arange(size),
    }
)
ddf = dd.from_pandas(df, npartitions=4)

result = dd.shuffle.rearrange_by_divisions(
    ddf, "index", (0, 1, 2, 3, 4), shuffle="tasks"
)

Note that the repartition(columns=) API in dask_cudf is using something very similar to Dask's rearrange_by_column_tasks (which also uses the "shuffle-split" name convention).

dask/order.py Outdated
# TODO: Hack to priorities "shuffle-split"
shuffle_split_keys = []
for k in result.keys():
if len(k) > 0 and "shuffle-split" in k[0]:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What about using a general label that any API or user could add to the key of a task? For example, something like "dsk-prioritize":

# Prioritize tasks with "dsk-prioritize" annotation
for k in list(result.keys()):
    if k and "dsk-prioritize" in k[0]:
        result[k] = 0

I know you mentioned off-line that the best general solution would be to annotate the priority within the task. Is this what you had in mind - or something more sophisticated? I guess the specific priority could also be included in the annotation if the user/api wants really fine-grained control.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, something like this but overloading key names is properly too much of a hack to be accepted :)

@beckernick
Copy link
Member

beckernick commented Mar 31, 2020

@TomAugspurger - A good dask-only reproducer is something like this:

import dask.dataframe as dd
import pandas as pd
import numpy as np

size = 48
df = pd.DataFrame(
    {
        "index": np.random.choice(list(range(4)), size),
        "a": np.arange(size),
    }
)
ddf = dd.from_pandas(df, npartitions=4)

result = dd.shuffle.rearrange_by_divisions(
    ddf, "index", (0, 1, 2, 3, 4), shuffle="tasks"
)

Note that the repartition(columns=) API in dask_cudf is using something very similar to Dask's rearrange_by_column_tasks (which also uses the "shuffle-split" name convention).

For reference, these graphs visualize as the following:

The current execution:

current_dask_graph (2)

This PR
new_dask_graph (2)

@madsbk
Copy link
Contributor Author

madsbk commented Apr 1, 2020

Opened an discussion of a better solution than this hack :)
#6054

@mrocklin
Copy link
Member

mrocklin commented Apr 1, 2020

Thanks for raising @madsbk , and for producing images and reproducers @beckernick and @rjzamora .

Whenever I come across a case where someone has made a particular workflow faster/better by working around Dask's scheduling heuristics I try to see if there is a way to generalize the improvement, rather than finding ways to make the workaround easier. Early on we had lots of these situations, and being disciplined about learning from special-case improvements and applying those lessons to the global heuristics made it so that our heuristics became decent over time (these sorts of situations are much less common today).

To that end, I'm curious about how Dask's ordering got this wrong. I'd like to dig into this comment from @madsbk :

The scheduling policies of Dask is depth-first generally speaking, which works great in most cases. However, it can increase the memory usage significantly when we are splitting the output of a task into many small task (like in rearrange_by_column_tasks()'s shuffle-group and shuffle-split tasks). In this case depth-first delays the freeing of shuffle-group until the end of the shuffling, which uses much more memory than a breadth first ordering where all the shuffle-split tasks are finished immediately and the output of shuffle-group can be freed before continuing.

First, you might want to try #5872 by @eriknw , which changes around ordering. I'm not confident that it will resolve your problem here, but it should be easy to try.

Second, is there anything we can learn from this so that Dask makes the right decision automatically, rather than requiring special input from the user? The ordering code is challenging to get into today, but it may be that @eriknw (who I believe is lurking on this thread) could help if we could find some improvement to make.

@eriknw
Copy link
Member

eriknw commented Apr 1, 2020

Let me confirm what's happening here. The data from a task such as (1, (0,)) in #6051 (comment)) is large. All of the dependents of this task create small data. In this case, we want to compute all the dependents so we can release the parent data. In other words, the total size of all dependents is at most comparable to the size of the parent.

I don't think there is much order can do here without knowing information about which tasks are small. If there was even only one other large dependent, then calculating all dependents is the wrong thing to do. If we had estimates of the size of each task, then both fuse and order could do better.

In this example, are the tasks that create small data getters such as getitem?

@rjzamora
Copy link
Member

rjzamora commented Apr 1, 2020

...I try to see if there is a way to generalize the improvement, rather than finding ways to make the workaround easier.

We totally agree @mrocklin - We don't want to implement/maintain any workarounds unless it proves completely necessary :)

In this example, are the tasks that create small data getters such as getitem?

@eriknw - Yes, the tasks returning "large" data, are actually taking in a single dataframe partition and then splitting the partition into a dictionary of smaller dataframes. The "shuffle-split" tasks are just calling getitem on a single element of the large dictionary.

@madsbk madsbk mentioned this pull request Apr 2, 2020
3 tasks
@madsbk
Copy link
Contributor Author

madsbk commented Apr 2, 2020

Agree with @eriknw, just by looking at the structure of the task graph, order() has no way of determining when breadth first is preferable.

I have implemented a non-intrusive solution here: #6059.

@eriknw
Copy link
Member

eriknw commented Apr 30, 2020

I've continued to ponder a more general solution.

It may not be unreasonable to identify and keep track of tasks that are expected to be much smaller than their dependencies. dask.array.optimize already does something similar by identifying keys (such as those from GETTERS) to not fuse.

By knowing tiny tasks, both fuse and order can be smarter and should be able to handle the example of this PR. I bet I can whip of a PoC for order if there's interest.

I don't have a strong opinion how this information is created and managed (such as from task annotations vs other). My PoC will probably accept a set of "tiny_keys".

@madsbk madsbk force-pushed the shuffle_split_high_priority branch 2 times, most recently from 76a4898 to e788c9f Compare September 30, 2020 14:45
@madsbk madsbk force-pushed the shuffle_split_high_priority branch 2 times, most recently from 30093bf to 888dcdb Compare October 12, 2020 12:08
Base automatically changed from master to main March 8, 2021 20:19
@madsbk madsbk force-pushed the shuffle_split_high_priority branch from 888dcdb to 5ef58ca Compare June 22, 2021 08:04
@mrocklin
Copy link
Member

I'm excited to see activity on this PR. Now that annotations are in I'm hopeful that this will be an easier win.

@madsbk
Copy link
Contributor Author

madsbk commented Jun 23, 2021

I'm excited to see activity on this PR. Now that annotations are in I'm hopeful that this will be an easier win.

Yeah, finally got around to look at this again :)
Proposed a solution that make use of our new annotation API: #7826

@madsbk madsbk marked this pull request as ready for review June 23, 2021 12:02
@madsbk madsbk closed this Jun 23, 2021
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

6 participants