Skip to content

[REVIEW] Generalize rearrange_by_column_tasks and add DataFrame.shuffle#6066

Merged
TomAugspurger merged 20 commits intodask:masterfrom
rjzamora:hash-join
May 26, 2020
Merged

[REVIEW] Generalize rearrange_by_column_tasks and add DataFrame.shuffle#6066
TomAugspurger merged 20 commits intodask:masterfrom
rjzamora:hash-join

Conversation

@rjzamora
Copy link
Member

@rjzamora rjzamora commented Apr 4, 2020

Following the discussion in #5741 (particularly in/after this comment), this draft PR generalizes the rearrange_by_column_tasks implementation to handle hash-based shuffling without the existence of a "_partitions" column (including support for multiple shuffle-index columns).

TODO:

  • Include benchmarking numbers for both cudf and pandas-backed operations

  • Tests added / passed

  • Passes black dask / flake8 dask

@mrocklin
Copy link
Member

It looks like hasn't received any review over thepast few weeks. My apologies @rjzamora . Is this still live ?

@rjzamora
Copy link
Member Author

It looks like hasn't received any review over thepast few weeks. My apologies @rjzamora . Is this still live ?

Thanks for the ping @mrocklin - I have been meaning to revisit and revise this.

Overall, it would be wonderful to have a generalized rearrange_by_column_tasks that will actually handle more than one column (and simply perform a hash in both shuffle_group and shuffle_group_2 in lieu of using a "_partitions") column. I feel that dask_cudf is currently duplicating code just to avoid creating/assigning the "_partitions" column. We would definitely prefer to use upstream-Dask for all shuffling. Although the simplicity of the single-column routine is likely best for pandas-backed DataFrame shuffling, we are really hoping to introduce a minimal amount of flexibility here. Since device memory is a valuable resource, we are doing our best to shed every byte we can (to reduce the amount of data we need to spill to host memory).

It would also be nice to introduce a mechanism to use the above memory optimization in the merge code path. However, I certainly understand if you would rather dask_cudf implement/maintain it's own wrapper to avoid "_partition" creation.

@rjzamora rjzamora marked this pull request as ready for review April 27, 2020 16:06
@rjzamora
Copy link
Member Author

Following the comments in #6133 , I decided to add the DataFrame.shuffle API in this PR. (cc @TomAugspurger )

In order to allow dask_cudf to minimize the cost of the "_partitions" column creation/assignment (for workflows with high memory pressure), I added the shuffle_dtype argument in a few places. This argument allows the user to explicitly specify the type of the "_partitions" column. If this argument is set to False, "_partitions" will not be created at all (and hashing will be performed within all local shuffle-group operations).

Copy link
Member

@TomAugspurger TomAugspurger left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Going through the shuffle_dtype changes now. It seems like an implementation detail that's leaking through to the public API. Is there any reason to not use shuffle_dtype=False when using hash-based partitioning?

""" Rearrange DataFrame into new partitions by index

Uses hashing to map rows to output partitions. After this operation,
rows with the same index element(s) will be in the same partition.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's implied, but maybe mention that the result will have unknown divisions?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, I see you mention that in the Notes. That's fine too.

@rjzamora
Copy link
Member Author

Thanks for reviewing Tom!

Is there any reason to not use shuffle_dtype=False when using hash-based partitioning?

There be cases in which it is more performant to create the "_partitions" column, but I haven't experienced this. It seems likely that the "_partitions"-based workflow was used to align the shuffle algorithm to be used cleanly wit set_index. In my experience it is a significant memory and communication benefit to avoid adding an additional column (hence my motivation for this PR).

Simple benchmark as motivation:

from dask.distributed import LocalCluster, Client, wait
from dask.datasets import timeseries

cluster = LocalCluster(n_workers=8)
client = Client(cluster)
ddf = timeseries(start='2000-01-01', end='2000-12-31', partition_freq='1d')

# Default
%timeit wait(ddf.shuffle("id", shuffle="tasks", shuffle_dtype=None).persist())

# Minimal dtype
%timeit wait(ddf.shuffle("id", shuffle="tasks", shuffle_dtype="uint16").persist())

# No "_partitions"
%timeit wait(ddf.shuffle("id", shuffle="tasks", shuffle_dtype=False).persist())

Default

18.4 s ± 241 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)

Minimal dtype

19 s ± 247 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)

No "_partitions"

16.8 s ± 124 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)

I am happy to avoid the creation of "_partitions" (whenever possible) if it seems reasonable to others :)

@TomAugspurger
Copy link
Member

Thanks. I noticed that there's a decent amount of variability in the size of the hashed partitions

rows

Some of that is expected, since the id column isn't uniform. But there's perhaps more unevenness than I'd expect. For example, partition 214 has values from id = [983, 982, 1046].

@rjzamora
Copy link
Member Author

Some of that is expected, since the id column isn't uniform. But there's perhaps more unevenness than I'd expect. For example, partition 214 has values from id = [983, 982, 1046].

Good observation @TomAugspurger - We definitley do not get balanced partitions when the number of uniques in the on column(s) is not much larger than the number of output partitions. In the example above, the number of output partitions is likely to be in the same ballpark as the number of unique values in "id":

print(ddf.npartitions, len(ddf["id"].unique()))

Output: 365 321 (Of course, the second number will vary)

For this reason, I think it makes sense that the output will be unbalanced. You will see much better balance if you do something like:

ddf = timeseries(start='2000-01-01', end='2000-12-31', partition_freq='1d', id_lam=100_000_000)

Perhaps we should add a note to the DataFrame.shuffle docstring that the index must have many uniques to produced balanced partitions?

Copy link
Member

@fjetter fjetter left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree with the remark of @TomAugspurger about the shuffle_dtype as part of the public API and if it is actually included in the public API there should also be a remark about which scenarios would benefit from creating the column. I'm struggling to come up with a scenario which would benefit from this situation. If this is very rare, I would suggest to not expose this parameter (internally it's fine if it makes a difference e.g. for set_index)

@TomAugspurger the non-uniformity of the buckets is something which didn't change here, did it? The hash bucketing logic is effectively the same, isn't it?

@rjzamora
Copy link
Member Author

I agree with the remark of @TomAugspurger about the shuffle_dtype as part of the public API and if it is actually included in the public API there should also be a remark about which scenarios would benefit from creating the column. I'm struggling to come up with a scenario which would benefit from this situation. If this is very rare, I would suggest to not expose this parameter (internally it's fine if it makes a difference e.g. for set_index)

Agreed - The cleanest change is probably to simplify the public API, and to always avoid creating the "_partitions" column in dask.dataframe.shuffle.shuffle. The changes to rearrange_by_column_tasks in this PR will still allow for the "_partitions"-based approach in set_index (and sort_values when added).

@TomAugspurger the non-uniformity of the buckets is something which didn't change here, did it? The hash bucketing logic is effectively the same, isn't it?

Right - These changes will not actually change the hash statistics. It just changes when hashing is performed (and exposes a new DataFrame.shuffle method)

@rjzamora
Copy link
Member Author

rjzamora commented Apr 30, 2020

Note: I will need to revise the changes here a bit to align with #6137

@rjzamora rjzamora changed the title [WIP] Generalize rearrange_by_column_tasks and optimize shuffle [REVIEW] Generalize rearrange_by_column_tasks and add DataFrame.shuffle May 11, 2020
@rjzamora
Copy link
Member Author

Update:

  • Removed shuffle_dtype. Will always avoid creating "_partitions" when the user is passing in a column name (or list of column names) to DataFrame.shuffle.
  • Added the _simple_rearrange_by_column_tasks code path for the case that the output partition count is small (less than max_branch). The primary motivation is the case that the output partition count is small, but different from the input partition count. Unless I am misunderstanding, the current algorithm will first perform an df.npartition-to-df.npartition shuffle, and then repartition. A simpler algorithm can do this all in a single shuffle.

@rjzamora rjzamora changed the title [REVIEW] Generalize rearrange_by_column_tasks and add DataFrame.shuffle [WIP] Generalize rearrange_by_column_tasks and add DataFrame.shuffle May 11, 2020
@rjzamora rjzamora changed the title [WIP] Generalize rearrange_by_column_tasks and add DataFrame.shuffle Generalize rearrange_by_column_tasks and add DataFrame.shuffle May 20, 2020
Copy link
Member

@TomAugspurger TomAugspurger left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for working on this. Overall it's looking nice.

IIUC, this hasn't changed the data model at all: there's no indication on the object that the dataframe is now partitioned by one or more columns. Should we? For index-based partitioning, we have .divisions, and known divisions are reflected in the repr. Or perhaps that doesn't make sense to add, since we don't know the hashed values like we know the divisions? I haven't thought through this fully.

@rjzamora
Copy link
Member Author

rjzamora commented May 21, 2020

IIUC, this hasn't changed the data model at all: there's no indication on the object that the dataframe is now partitioned by one or more columns. Should we? For index-based partitioning, we have .divisions, and known divisions are reflected in the repr. Or perhaps that doesn't make sense to add, since we don't know the hashed values like we know the divisions? I haven't thought through this fully.

Right - I do think it ultimately makes sense to expand the dask.dataframe data model to allow the divisions attribute to be represented as a pd.DataFrame-like object so that the divisions can can correspond to one or more columns (including or not including the index). With that said, the shuffle method added here is not resulting in an "order"-based partitioning, so the same concept of "divisions" is not really useful (unless we are interested in eventually storing information beyond lexicographical ordering in the divisions - which seems like a stretch to me)

@TomAugspurger
Copy link
Member

Yeah, agreed that we can't have a .divisions-like attribute for column shuffle-partitioned datasets. Just wondering if the fact that it's partitioned on that column's values should be reflected in the repr. But that can wait till later!

@rjzamora rjzamora changed the title Generalize rearrange_by_column_tasks and add DataFrame.shuffle [REVIEW] Generalize rearrange_by_column_tasks and add DataFrame.shuffle May 22, 2020
Copy link
Member

@TomAugspurger TomAugspurger left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks good.

@rjzamora do you have strong thoughts on if we should / how to expose (hash-based) column partioning in the data model? I'd like to have an issue to collect discussion on this topic, but I'm still working through it in my head. If you don't have strong thoughts then I'll assign myself a task to write up an issue.

@rjzamora
Copy link
Member Author

rjzamora commented May 26, 2020

@rjzamora do you have strong thoughts on if we should / how to expose (hash-based) column partioning in the data model? I'd like to have an issue to collect discussion on this topic, but I'm still working through it in my head. If you don't have strong thoughts then I'll assign myself a task to write up an issue.

No strong thoughts from me, but I will be very happy to participate in a discussion :)... Without giving it too much thought, I think we will probably want to allow the divisions to be represented by something like a pd.DataFrame object. This way, the divisions can correspond to either an Index/MultiIndex, or an arbitrary set of columns. We may end up wanting to introduce a new Divisions class to organize the necessary attributes. If the divisions correspond to hashed values rather than literal values, defining an attribute like hash_partitioned=True may do the trick. [EDIT: I guess for the hashed case we wouldm't really want a full pd.DataFrame of division values...]

@rjzamora
Copy link
Member Author

Looks like failures are unrelated.

@TomAugspurger
Copy link
Member

Yep, those were just fixed on master. I'll merge this and open an issue on the data model things.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

6 participants