[WIP] Add "tasks-hash" option for join/merge routines#5741
[WIP] Add "tasks-hash" option for join/merge routines#5741rjzamora wants to merge 7 commits intodask:masterfrom
Conversation
|
Adding a new shuffle mechanism seems very expensive to me from a maintenance perspective (all dataframe maintainers will have to be educated about why this is important eventually). If we can get #5740 to work instead then that would be great. |
Completely agree - Just wanted to open a draft pr to document this as a possible alternative. |
|
@mrocklin - We currently have a In order to reduce memory pressure (a serious concern for us), we explicitly avoid the creation of the The reason I am bringing this feature up here is: (1) our long-term goal is to get the (multi-column) hash-based repartitioning feature moved into up-stream dask, and (2) we are very motivated to modify the up-stream join/merge algorithm to avoid the creation of the My original plan was to implement something like |
|
Thanks for the status update. FWIW, I would personally be interested in (optionally) decoupling partitioning from the index. I could imagine an update to the dask.dataframe data model to support (hash) partitioning on a column. This would let us optimize things like boolean filters on the partition column df2 = df.repartition(column="id")
subset = df2[df2.id == "A"] # only needs to scan partitions where `id == A`.While updating the data model like this is a big project, a function like |
|
Thanks for the feedback @TomAugspurger! I will submit a draft PR with some minor changes to the shuffle code. The existing |
Would this be mostly resolved by making |
|
To be clear, when you say "repartitioning" do you mean that you're just moving the boundaries of the partitions around, but not changing the order of rows, as is done in the repartition function, or are you moving around all of the rows so that things are grouped together with related rows. If so, I'm going to suggest that we call this shuffling rather than repartitioning to avoid a naming collision within dask.dataframe nomenclature. |
Without thinking too much about this (sorry, I think it'll probably take me a while to come up to speed, and I'm short on time these days) I think that additions of large complex bodies of code to the shuffle codebase would need to be backed up with large improvements to general and common workflows. Submitting a PR is always welcome because it makes it more clear what the long term cost is. I'm sorry that I'm unable to give a sense for whether this will likely be accepted ahead of time. |
|
@TomAugspurger I'm not sure I understand how this is a step towards separating partitioning from the index. |
Good suggestion - This certainly helps, but I had trouble going below int32 in the past. I'll experiment with this again and collect performance numbers to compare with the "_partitions"-free approach.
Yes - I am not particularly attached to this functionality being exposed within the repartitioing API. We are not repartitioning in the same sense as the other Dask options. Rather, we are shuffling the data to get unique keys, for a specified set of columns, into the same partition. The inclusion of this feature as a
I believe Tom is generally refering to the effort to implement a public api for the shuffling/paritioning of DataFrames by non-index columns. The key functionality is more-or-less already in place, the bigger question is really "How should we expose this functionality to the user?" |
If this is the API that you want then I would encourage you to raise an issue specifically around it. Hopefully other dataframe folks can get involved on deciding what is best there. |
This is also motivated by the deep-copy overhead discussed in #5739
rearrange_by_column_tasksis already capable of using hashing to split dataframe partitions (avoiding the need to assign a temporary"_partitions"column). This PR just exposes this logic to themerge/joinroutines by allowing users to passshuffle="tasks-hash".This change provides the same performance improvement for the cudf merge benchmark as #5740 (which may have "correctness" problems):
AFTER Changes:
Worker-Worker Transfer Rates
BEFORE Changes:
Worker-Worker Transfer Rates
black dask/flake8 dask