Conversation
split_every=1 for shuffle-based groupby
|
Okay, I take it back, This is largely orthogonal to the choice of when to default to the shuffle-based groupby, which I still believe should just be when |
rjzamora
left a comment
There was a problem hiding this comment.
Thanks @ian-r-rose ! I don't think I agree with the split_every analysis, but I definitely agree that the shuffle-based algorithm should be default for split_out>1.
I expect this to conflict significantly with the proposed alternative in #9446
I wouldn't worry about that PR (I will most likely close it). I'm not crazy about adding a new key-word argument to aggregate anyway :)
| if shuffle is None: | ||
| if split_out > 1: | ||
| shuffle = shuffle or config.get("shuffle", None) or "disk" | ||
| else: | ||
| shuffle = False |
There was a problem hiding this comment.
I'm not sure that we ever want to use shuffle ="disk" by default. It is quite slow (definitely slower than ACA).
There was a problem hiding this comment.
I was also unsure about this -- I did this in the interests of being consistent with the other places where a default shuffle implementation is chosen (basically, default to disk, unless there is a Client active).
Do you think disk is also a bad idea there?
There was a problem hiding this comment.
Yes (sorry for the delayed response). My personal opinion is that shuffle="disk" should never be default behavior. This PR is using shuffle="task"/"p2p" benchmark results to rationalize a change in default behavior. I don't expect shuffle="disk" to be very performant at all.
There was a problem hiding this comment.
Since this is new functionality, I would be comfortable changing the default to "tasks", despite the inconsistency with other shuffling defaults.
I can also open a new issue for making "tasks" the default everywhere.
|
Thanks for taking a look @rjzamora! I'm now backing off of my initial proposal to make I don't like that I haven't been able to think of a good heuristic for what it should be, but all I can think of involves comparing number of partitions, representative partition sizes, and number of workers. |
That seems fair to me. For both the shuffle and aca algorithms, Take-away: We should indeed support a distinct "lower-limit" on |
jrbourbeau
left a comment
There was a problem hiding this comment.
Just checking in here, it looks like there's good agreement -- is this safe to merge?
I think the only sticking point for me is that I'd like to avoid using |
|
Disk is much better for the local scheduler. It has the same scalability
and out of memory properties as the p2p shuffle
…On Tue, Sep 6, 2022, 8:59 AM Ian Rose ***@***.***> wrote:
***@***.**** commented on this pull request.
------------------------------
In dask/dataframe/groupby.py
<#9453 (comment)>:
> + if shuffle is None:
+ if split_out > 1:
+ shuffle = shuffle or config.get("shuffle", None) or "disk"
+ else:
+ shuffle = False
Since this is new functionality, I would be comfortable changing the
default to "tasks", despite the inconsistency with other shuffling
defaults.
I can also open a new issue for making "tasks" the default everywhere.
—
Reply to this email directly, view it on GitHub
<#9453 (comment)>, or
unsubscribe
<https://github.com/notifications/unsubscribe-auth/AACKZTHNMPUJPNO452ERYFDV45S7JANCNFSM6AAAAAAQC2B7DI>
.
You are receiving this because you are subscribed to this thread.Message
ID: ***@***.***>
|
This is probably true for larger-than-memory data, but I definitely see a significant performace regression for "large" data (5-6GB) that still fits comfortably in memory: from dask.datasets import timeseries
ddf = timeseries(end='2002-01-01', id_lam=1e12)
%time ddf.groupby("id").agg({"x": "mean"}, split_out=4, shuffle=False).compute()
# Wall time: 12.9 s
%time ddf.groupby("id").agg({"x": "mean"}, split_out=4, shuffle="tasks").compute()
# Wall time: 13.2 s
%time ddf.groupby("id").agg({"x": "mean"}, split_out=4, shuffle="disk").compute()
# Wall time: 31 sGenerally speaking, the shuffle-vs-aca decision is a bit more complicated for the threaded scheduler. |
|
From my end, this is ready. Is there anything else to be done from your perspective @rjzamora? |
I expect this to conflict significantly with the proposed alternative in #9446, though this PR has much less ambition. I'm just trying to select some defaults that are likely to have better performance based on some of the investigations in #9406 (comment)
I feel pretty good about the choice of having shuffle on-by-default when
split_out > 1. I'm less sure about what to do aboutsplit_every. When we have fewer, larger partitions, we probably want it to be close to one. When we have many smaller partitions, it probably makes sense for it to be eight or sixteen, as @rjzamora was finding (note that in this contextsplit_everyis more of a repartitioning granularity rather than a tree-reduction width). I am not sure if there really is a great solution short of the things that @rjzamora is trying in #9446, since we don't typically know partition sizes ahead of time. (Though we could in some instances, like with parquet!)pre-commit run --all-files