-
-
Notifications
You must be signed in to change notification settings - Fork 1.9k
Description
I imagine that operations like these are somewhat common:
merged = dd.merge(ddf1, ddf2, on=["a", "b"])
result = merged.groupby(["a", "b"]).apply(custom_func)ab = dd.merge(ddf1, ddf2, on=["a", "b"])
cd = dd.merge(ddf3, ddf4, on=["a", "b"])
final = dd.merge(ab, cd)In these cases, there's a much faster path we could take in groupby and merge if we knew that the input was already sorted by ["a", "b"]—even if we didn't know its divisions.
We just need to know that the partitions are already sorted along the index/columns in question—that is, any given value of the group will only be found in exactly one partition. Groupby could become just a map_partitions in that case #8361, and merge already is internally.
What if we had another parameter like divisions, which tracked which column(s)/index a DataFrame was sorted by, if any?
- If
df.known_divisions, thendf.partitioned_bywould always be known - If
df.partitioned_by,df.divisionsmight still be unknown. This would be the case after any shuffle-based operation, namely a merge.
Then methods like groupby and merge could take advantage of this information to offer embarrassingly-parallel paths in these cases.
I know that maintaining divisions for multiple columns has been discussed before, but this seems like a much easier change to make initially, since we wouldn't be changing the meaning of current divisions. We could add the field, then add support for it in other methods gradually.
cc @ian-r-rose @jsignell @rjzamora @jrbourbeau
Very related to:
- Non-index-based partitioning of Dask DataFrames #6246 (somewhat the same as this)
- Full support for multiindex in dataframes #1493