Skip to content

Shuffle groupby default#9453

Merged
ian-r-rose merged 5 commits intodask:mainfrom
ian-r-rose:shuffle-groupby-default
Sep 13, 2022
Merged

Shuffle groupby default#9453
ian-r-rose merged 5 commits intodask:mainfrom
ian-r-rose:shuffle-groupby-default

Conversation

@ian-r-rose
Copy link
Collaborator

I expect this to conflict significantly with the proposed alternative in #9446, though this PR has much less ambition. I'm just trying to select some defaults that are likely to have better performance based on some of the investigations in #9406 (comment)

I feel pretty good about the choice of having shuffle on-by-default when split_out > 1. I'm less sure about what to do about split_every. When we have fewer, larger partitions, we probably want it to be close to one. When we have many smaller partitions, it probably makes sense for it to be eight or sixteen, as @rjzamora was finding (note that in this context split_every is more of a repartitioning granularity rather than a tree-reduction width). I am not sure if there really is a great solution short of the things that @rjzamora is trying in #9446, since we don't typically know partition sizes ahead of time. (Though we could in some instances, like with parquet!)

split_every=1 for shuffle-based groupby
@ian-r-rose
Copy link
Collaborator Author

Okay, I take it back, split_every=1 as a default for shuffling is a bad idea. But I still think it's worthwhile to have it as an option, as I do think it is the right choice when there are few, large partitions per worker.

This is largely orthogonal to the choice of when to default to the shuffle-based groupby, which I still believe should just be when split_out > 1.

Copy link
Member

@rjzamora rjzamora left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @ian-r-rose ! I don't think I agree with the split_every analysis, but I definitely agree that the shuffle-based algorithm should be default for split_out>1.

I expect this to conflict significantly with the proposed alternative in #9446

I wouldn't worry about that PR (I will most likely close it). I'm not crazy about adding a new key-word argument to aggregate anyway :)

Comment on lines +1662 to +1666
if shuffle is None:
if split_out > 1:
shuffle = shuffle or config.get("shuffle", None) or "disk"
else:
shuffle = False
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure that we ever want to use shuffle ="disk" by default. It is quite slow (definitely slower than ACA).

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was also unsure about this -- I did this in the interests of being consistent with the other places where a default shuffle implementation is chosen (basically, default to disk, unless there is a Client active).

Do you think disk is also a bad idea there?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes (sorry for the delayed response). My personal opinion is that shuffle="disk" should never be default behavior. This PR is using shuffle="task"/"p2p" benchmark results to rationalize a change in default behavior. I don't expect shuffle="disk" to be very performant at all.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since this is new functionality, I would be comfortable changing the default to "tasks", despite the inconsistency with other shuffling defaults.

I can also open a new issue for making "tasks" the default everywhere.

@ian-r-rose
Copy link
Collaborator Author

Thanks for taking a look @rjzamora! I'm now backing off of my initial proposal to make split_every default to 1, but I think I'd still advocate for it to be allowed to be one in this case.

I don't like that I haven't been able to think of a good heuristic for what it should be, but all I can think of involves comparing number of partitions, representative partition sizes, and number of workers.

@rjzamora
Copy link
Member

rjzamora commented Sep 2, 2022

Thanks for taking a look @rjzamora! I'm now backing off of my initial proposal to make split_every default to 1, but I think I'd still advocate for it to be allowed to be one in this case.

That seems fair to me. For both the shuffle and aca algorithms, split_every is meant to specify how many chunk-task outputs can be safely (and perhaps efficiently) concatenated together. In the aca algo, this is used to specify the "k" in the k-ary reduction tree. While in the shuffle algo, this tells us how many adjacent partitions should be coelesced after the initial blockwise chunk aggregation. These concepts are very similar, but the lower limits happen to slightly different.

Take-away: We should indeed support a distinct "lower-limit" on split_every for the shuffle-based algorithm.

Copy link
Member

@jrbourbeau jrbourbeau left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just checking in here, it looks like there's good agreement -- is this safe to merge?

@rjzamora
Copy link
Member

rjzamora commented Sep 6, 2022

it looks like there's good agreement -- is this safe to merge?

I think the only sticking point for me is that I'd like to avoid using shuffle="disk" by default on the threaded scheduler - I'd expect that to be a significant performance regression compared to shuffle=False. Therefore, we need to use shuffle="tasks" explicitly, or somehow make the shuffle default scheduler-dependent.

@mrocklin
Copy link
Member

mrocklin commented Sep 6, 2022 via email

@rjzamora
Copy link
Member

rjzamora commented Sep 6, 2022

Disk is much better for the local scheduler. It has the same scalability
and out of memory properties as the p2p shuffle

This is probably true for larger-than-memory data, but I definitely see a significant performace regression for "large" data (5-6GB) that still fits comfortably in memory:

from dask.datasets import timeseries

ddf = timeseries(end='2002-01-01', id_lam=1e12)
%time ddf.groupby("id").agg({"x": "mean"}, split_out=4, shuffle=False).compute()
# Wall time: 12.9 s

%time ddf.groupby("id").agg({"x": "mean"}, split_out=4, shuffle="tasks").compute()
# Wall time: 13.2 s

%time ddf.groupby("id").agg({"x": "mean"}, split_out=4, shuffle="disk").compute()
# Wall time: 31 s

Generally speaking, the shuffle-vs-aca decision is a bit more complicated for the threaded scheduler.

@jrbourbeau jrbourbeau mentioned this pull request Sep 13, 2022
7 tasks
@ian-r-rose
Copy link
Collaborator Author

From my end, this is ready. Is there anything else to be done from your perspective @rjzamora?

Copy link
Member

@rjzamora rjzamora left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM - Thanks @ian-r-rose !

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Shuffle-based groupby aggregation by default

4 participants