[REVIEW] Generalize rearrange_by_column_tasks and add DataFrame.shuffle#6066
[REVIEW] Generalize rearrange_by_column_tasks and add DataFrame.shuffle#6066TomAugspurger merged 20 commits intodask:masterfrom
Conversation
|
It looks like hasn't received any review over thepast few weeks. My apologies @rjzamora . Is this still live ? |
Thanks for the ping @mrocklin - I have been meaning to revisit and revise this. Overall, it would be wonderful to have a generalized It would also be nice to introduce a mechanism to use the above memory optimization in the merge code path. However, I certainly understand if you would rather |
|
Following the comments in #6133 , I decided to add the In order to allow dask_cudf to minimize the cost of the |
TomAugspurger
left a comment
There was a problem hiding this comment.
Going through the shuffle_dtype changes now. It seems like an implementation detail that's leaking through to the public API. Is there any reason to not use shuffle_dtype=False when using hash-based partitioning?
dask/dataframe/core.py
Outdated
| """ Rearrange DataFrame into new partitions by index | ||
|
|
||
| Uses hashing to map rows to output partitions. After this operation, | ||
| rows with the same index element(s) will be in the same partition. |
There was a problem hiding this comment.
It's implied, but maybe mention that the result will have unknown divisions?
There was a problem hiding this comment.
Ah, I see you mention that in the Notes. That's fine too.
|
Thanks for reviewing Tom!
There be cases in which it is more performant to create the Simple benchmark as motivation: from dask.distributed import LocalCluster, Client, wait
from dask.datasets import timeseries
cluster = LocalCluster(n_workers=8)
client = Client(cluster)
ddf = timeseries(start='2000-01-01', end='2000-12-31', partition_freq='1d')
# Default
%timeit wait(ddf.shuffle("id", shuffle="tasks", shuffle_dtype=None).persist())
# Minimal dtype
%timeit wait(ddf.shuffle("id", shuffle="tasks", shuffle_dtype="uint16").persist())
# No "_partitions"
%timeit wait(ddf.shuffle("id", shuffle="tasks", shuffle_dtype=False).persist())Default Minimal dtype No "_partitions" I am happy to avoid the creation of "_partitions" (whenever possible) if it seems reasonable to others :) |
Good observation @TomAugspurger - We definitley do not get balanced partitions when the number of uniques in the print(ddf.npartitions, len(ddf["id"].unique()))Output: For this reason, I think it makes sense that the output will be unbalanced. You will see much better balance if you do something like: ddf = timeseries(start='2000-01-01', end='2000-12-31', partition_freq='1d', id_lam=100_000_000)Perhaps we should add a note to the |
fjetter
left a comment
There was a problem hiding this comment.
I agree with the remark of @TomAugspurger about the shuffle_dtype as part of the public API and if it is actually included in the public API there should also be a remark about which scenarios would benefit from creating the column. I'm struggling to come up with a scenario which would benefit from this situation. If this is very rare, I would suggest to not expose this parameter (internally it's fine if it makes a difference e.g. for set_index)
@TomAugspurger the non-uniformity of the buckets is something which didn't change here, did it? The hash bucketing logic is effectively the same, isn't it?
Agreed - The cleanest change is probably to simplify the public API, and to always avoid creating the
Right - These changes will not actually change the hash statistics. It just changes when hashing is performed (and exposes a new |
|
Note: I will need to revise the changes here a bit to align with #6137 |
|
Update:
|
TomAugspurger
left a comment
There was a problem hiding this comment.
Thanks for working on this. Overall it's looking nice.
IIUC, this hasn't changed the data model at all: there's no indication on the object that the dataframe is now partitioned by one or more columns. Should we? For index-based partitioning, we have .divisions, and known divisions are reflected in the repr. Or perhaps that doesn't make sense to add, since we don't know the hashed values like we know the divisions? I haven't thought through this fully.
Right - I do think it ultimately makes sense to expand the dask.dataframe data model to allow the |
|
Yeah, agreed that we can't have a |
TomAugspurger
left a comment
There was a problem hiding this comment.
This looks good.
@rjzamora do you have strong thoughts on if we should / how to expose (hash-based) column partioning in the data model? I'd like to have an issue to collect discussion on this topic, but I'm still working through it in my head. If you don't have strong thoughts then I'll assign myself a task to write up an issue.
No strong thoughts from me, but I will be very happy to participate in a discussion :)... Without giving it too much thought, I think we will probably want to allow the |
|
Looks like failures are unrelated. |
|
Yep, those were just fixed on master. I'll merge this and open an issue on the data model things. |

Following the discussion in #5741 (particularly in/after this comment), this draft PR generalizes the
rearrange_by_column_tasksimplementation to handle hash-based shuffling without the existence of a "_partitions" column (including support for multiple shuffle-index columns).TODO:
Include benchmarking numbers for
both cudf andpandas-backed operationsTests added / passed
Passes
black dask/flake8 dask