Skip to content

[WIP] Add "tasks-hash" option for join/merge routines#5741

Closed
rjzamora wants to merge 7 commits intodask:masterfrom
rjzamora:hash_shuffle
Closed

[WIP] Add "tasks-hash" option for join/merge routines#5741
rjzamora wants to merge 7 commits intodask:masterfrom
rjzamora:hash_shuffle

Conversation

@rjzamora
Copy link
Member

This is also motivated by the deep-copy overhead discussed in #5739

rearrange_by_column_tasks is already capable of using hashing to split dataframe partitions (avoiding the need to assign a temporary "_partitions" column). This PR just exposes this logic to the merge/join routines by allowing users to pass shuffle="tasks-hash".

This change provides the same performance improvement for the cudf merge benchmark as #5740 (which may have "correctness" problems):

AFTER Changes:

Merge benchmark
--------------------------
Chunk-size  | 100000000
Frac-match  | 0.3
Ignore-size | 1.05 MB
Protocol    | ucx
Device(s)   | 0,1,2,3
rmm-pool    | True
tcp         | True
ib          | False
nvlink      | True
==========================
Total time  | 716.79 ms
Total time  | 709.37 ms
Total time  | 673.95 ms
==========================

Worker-Worker Transfer Rates
(w1,w2)     | 25% 50% 75% (total nbytes)
--------------------------
(00,01)     | 4.63 GB/s 5.53 GB/s 7.03 GB/s (11.40 GB)
(00,02)     | 4.91 GB/s 5.16 GB/s 5.52 GB/s (9.00 GB)
(00,03)     | 5.22 GB/s 5.77 GB/s 6.14 GB/s (6.60 GB)
(01,00)     | 4.38 GB/s 5.20 GB/s 6.68 GB/s (11.40 GB)
(01,02)     | 5.04 GB/s 5.72 GB/s 5.80 GB/s (6.60 GB)
(01,03)     | 4.33 GB/s 5.36 GB/s 6.78 GB/s (9.00 GB)
(02,00)     | 5.33 GB/s 5.60 GB/s 7.13 GB/s (9.00 GB)
(02,01)     | 5.87 GB/s 6.19 GB/s 7.02 GB/s (6.60 GB)
(02,03)     | 5.78 GB/s 5.94 GB/s 6.39 GB/s (9.00 GB)
(03,00)     | 5.33 GB/s 5.78 GB/s 6.13 GB/s (6.60 GB)
(03,01)     | 4.41 GB/s 6.04 GB/s 6.70 GB/s (9.00 GB)
(03,02)     | 5.41 GB/s 5.69 GB/s 6.07 GB/s (9.00 GB)

BEFORE Changes:

Merge benchmark
--------------------------
Chunk-size  | 100000000
Frac-match  | 0.3
Ignore-size | 1.05 MB
Protocol    | ucx
Device(s)   | 0,1,2,3
rmm-pool    | True
tcp         | True
ib          | False
nvlink      | True
==========================
Total time  | 1.59 s
Total time  | 1.85 s
Total time  | 1.79 s
==========================

Worker-Worker Transfer Rates
(w1,w2)     | 25% 50% 75% (total nbytes)
--------------------------
(00,01)     | 3.36 GB/s 4.20 GB/s 5.54 GB/s (6.50 GB)
(00,02)     | 4.16 GB/s 4.55 GB/s 4.84 GB/s (8.90 GB)
(00,03)     | 3.97 GB/s 5.10 GB/s 6.93 GB/s (11.30 GB)
(01,00)     | 3.74 GB/s 4.15 GB/s 5.54 GB/s (7.20 GB)
(01,02)     | 3.75 GB/s 4.73 GB/s 6.42 GB/s (10.30 GB)
(01,03)     | 4.30 GB/s 5.22 GB/s 6.56 GB/s (11.00 GB)
(02,00)     | 4.17 GB/s 4.76 GB/s 6.55 GB/s (9.60 GB)
(02,01)     | 4.78 GB/s 5.86 GB/s 6.68 GB/s (12.00 GB)
(02,03)     | 4.92 GB/s 5.76 GB/s 6.14 GB/s (7.20 GB)
(03,00)     | 3.60 GB/s 5.56 GB/s 7.38 GB/s (10.30 GB)
(03,01)     | 3.50 GB/s 4.68 GB/s 5.75 GB/s (9.60 GB)
(03,02)     | 4.39 GB/s 4.86 GB/s 6.59 GB/s (10.30 GB)
  • Tests added / passed
  • Passes black dask / flake8 dask

@mrocklin
Copy link
Member

Adding a new shuffle mechanism seems very expensive to me from a maintenance perspective (all dataframe maintainers will have to be educated about why this is important eventually). If we can get #5740 to work instead then that would be great.

@rjzamora
Copy link
Member Author

Adding a new shuffle mechanism seems very expensive to me from a maintenance perspective (all dataframe maintainers will have to be educated about why this is important eventually). If we can get #5740 to work instead then that would be great.

Completely agree - Just wanted to open a draft pr to document this as a possible alternative.

@rjzamora rjzamora closed this Feb 19, 2020
@rjzamora
Copy link
Member Author

rjzamora commented Apr 2, 2020

@mrocklin - We currently have a rearrange_by_hash shuffle mechanism in dask_cudf. The public dask_cudf api using this mechenism is DataFrame.repartition() (i.e. a new columns= argument can be used to specify the list of columns for hash-based repartitioning). This repartitioning approach does not result in an ordered index, and thus does not result in known divisions. However, we have found this algorithm to be more-or-less indispensable for certain workloads.

In order to reduce memory pressure (a serious concern for us), we explicitly avoid the creation of the "_partitions" column within rearrange_by_hash. In this sense, we are essentially doing the same thing as the shuffle="task-hash" code path added in this PR.

The reason I am bringing this feature up here is: (1) our long-term goal is to get the (multi-column) hash-based repartitioning feature moved into up-stream dask, and (2) we are very motivated to modify the up-stream join/merge algorithm to avoid the creation of the "_partitions" column. Our reasoning for (2) is no longer the deep-copy overhead (which has been addressed), but the fact that the need to carry around an extra column incurs significant memory/communication overhead at scale.

My original plan was to implement something like rearrange_by_hash in dask.dataframe, and to simply change the hash_join code to use this new algorithm (rather than shuffle.shuffle). However, I am a bit fearful that there will be resistence to the addition of new logic in dask.dataframe.shuffle - Do you have any thoughts/advice on the best path forward?

cc @VibhuJawa @randerzander

@TomAugspurger
Copy link
Member

Thanks for the status update.

FWIW, I would personally be interested in (optionally) decoupling partitioning from the index. I could imagine an update to the dask.dataframe data model to support (hash) partitioning on a column. This would let us optimize things like boolean filters on the partition column

df2 = df.repartition(column="id")
subset = df2[df2.id == "A"]  # only needs to scan partitions where `id == A`.

While updating the data model like this is a big project, a function like rearrange_by_hash is a (necessary?) step towards it. Hopefully that goal would be enough to assuage concerns about additional logic in shuffle :)

@rjzamora
Copy link
Member Author

rjzamora commented Apr 3, 2020

Thanks for the feedback @TomAugspurger!

I will submit a draft PR with some minor changes to the shuffle code. The existing rearrange_by_column_tasks can (almost) already handle hash-based repartitioning with/without multiple index columns, so there is no need to add a new rearrange_by_hash function. Instead, the shuffle_group and shuffle_group_2 utility functions can be expanded to perform hashing on demand (when the shuffle index is 1+ columns that are not "_partitions").

@mrocklin
Copy link
Member

mrocklin commented Apr 3, 2020

we are very motivated to modify the up-stream join/merge algorithm to avoid the creation of the "_partitions" column. Our reasoning for (2) is no longer the deep-copy overhead (which has been addressed), but the fact that the need to carry around an extra column incurs significant memory/communication overhead at scale

Would this be mostly resolved by making _partitions a smaller dtype? I think that we currently use uint64 by default, but if we were careful we could shrink this down to uint8 or uint16 most of the time, which would likely reduce the scale of this problem significantly. We used to do this, but ran into corner cases and so we just defaulted to a larger dtype. If folks are around to identify and work through the corner cases though then this might be a decent solution? if you have less than 256 partitions then we can shrink the memory use by 8x.

@mrocklin
Copy link
Member

mrocklin commented Apr 3, 2020

To be clear, when you say "repartitioning" do you mean that you're just moving the boundaries of the partitions around, but not changing the order of rows, as is done in the repartition function, or are you moving around all of the rows so that things are grouped together with related rows. If so, I'm going to suggest that we call this shuffling rather than repartitioning to avoid a naming collision within dask.dataframe nomenclature.

@mrocklin
Copy link
Member

mrocklin commented Apr 3, 2020

I am a bit fearful that there will be resistence to the addition of new logic in dask.dataframe.shuffle

Without thinking too much about this (sorry, I think it'll probably take me a while to come up to speed, and I'm short on time these days) I think that additions of large complex bodies of code to the shuffle codebase would need to be backed up with large improvements to general and common workflows.

Submitting a PR is always welcome because it makes it more clear what the long term cost is. I'm sorry that I'm unable to give a sense for whether this will likely be accepted ahead of time.

@mrocklin
Copy link
Member

mrocklin commented Apr 3, 2020

@TomAugspurger I'm not sure I understand how this is a step towards separating partitioning from the index.

@mrocklin mrocklin reopened this Apr 3, 2020
@mrocklin mrocklin closed this Apr 3, 2020
@rjzamora
Copy link
Member Author

rjzamora commented Apr 3, 2020

Would this be mostly resolved by making _partitions a smaller dtype? I think that we currently use uint64 by default, but if we were careful we could shrink this down to uint8 or uint16 most of the time, which would likely reduce the scale of this problem significantly. We used to do this, but ran into corner cases and so we just defaulted to a larger dtype. If folks are around to identify and work through the corner cases though then this might be a decent solution? if you have less than 256 partitions then we can shrink the memory use by 8x.

Good suggestion - This certainly helps, but I had trouble going below int32 in the past. I'll experiment with this again and collect performance numbers to compare with the "_partitions"-free approach.

To be clear, when you say "repartitioning" do you mean that you're just moving the boundaries of the partitions around, but not changing the order of rows, as is done in the repartition function, or are you moving around all of the rows so that things are grouped together with related rows. If so, I'm going to suggest that we call this shuffling rather than repartitioning to avoid a naming collision within dask.dataframe nomenclature.

Yes - I am not particularly attached to this functionality being exposed within the repartitioing API. We are not repartitioning in the same sense as the other Dask options. Rather, we are shuffling the data to get unique keys, for a specified set of columns, into the same partition. The inclusion of this feature as a columns= argument to repartition was actually chosen to align with a very similar naming/functionallity in Spark. There are dask_cudf users who like this name/api, but I doubt this is a sticking point on our end.

@TomAugspurger I'm not sure I understand how this is a step towards separating partitioning from the index.

I believe Tom is generally refering to the effort to implement a public api for the shuffling/paritioning of DataFrames by non-index columns. The key functionality is more-or-less already in place, the bigger question is really "How should we expose this functionality to the user?"

@mrocklin
Copy link
Member

mrocklin commented Apr 3, 2020

The inclusion of this feature as a columns= argument to repartition was actually chosen to align with a very similar naming/functionallity in Spark. There are dask_cudf users who like this name/api, but I doubt this is a sticking point on our end.

If this is the API that you want then I would encourage you to raise an issue specifically around it. Hopefully other dataframe folks can get involved on deciding what is best there.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants