Skip to content

Broadcast-based Merge implementation#7143

Merged
mrocklin merged 15 commits intodask:masterfrom
rjzamora:bcast-merge
Mar 3, 2021
Merged

Broadcast-based Merge implementation#7143
mrocklin merged 15 commits intodask:masterfrom
rjzamora:bcast-merge

Conversation

@rjzamora
Copy link
Member

@rjzamora rjzamora commented Jan 29, 2021

Dask-DataFrame will currently perform a shuffle-based merge for all cases where there are multiple partitions in both collections (and when the join is not on an index with known divisions). The shuffle-based approach makes a lot of sense when both of the collections comprise many partitions, but makes less sense when one of the collections is huge (making the shuffle expensive) and the other is very small (only a few partitions). This PR introduces a bcast_join for this latter scenario.

NOTE: There is still more work to do here, but preliminary tests suggest that the broadcast algorithm is indeed faster than a broadcast merge when a 240-partition DataFrame is joined with a DataFrame comprising 2-4 partitions (with the benefit increasing as the large DataFrame becomes larger)... I will share benchmarks here as the remaining tasks are completed.

TODO:

  • Add support for how="right"
  • Add support for npartitions kwarg (not sure this "should" be supported?)
  • Translate into an HLG layer
  • Expand testing
  • Investigate best heuristic for shuffle vs. bcast default. Need to test various workflows with dask.dataframe and dask_cudf

cc @beckernick

Copy link
Member

@mrocklin mrocklin left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Tiny comments!

Please don't block on any of these. The probability that github conversations get lost in my inbox these days is high.

"""
ntasks_shuffle = n_r * math.log2(n_r) + n_l * math.log2(n_l)
ntasks_bcast = n_r * n_l + n_l
return ntasks_bcast < ntasks_shuffle * 0.75
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have a slight preference to include this logic directly at the call site if you're ok with that. Rationale here

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm curious, why the 0.75 factor?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have a slight preference to include this logic directly at the call site if you're ok with that. Rationale here

Sound good to me.

I'm curious, why the 0.75 factor?

Just a rough scaling factor (I noticed it was best to prefer "shuffle" when the heuristic calculation was "close"). The heuristic is absolutely something I need to revisit/test :)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this factor be configurable perhaps with a default of 0.75? May allow us to adjust it more easily in the future or allow informed users to adjust based on their needs

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this factor be configurable perhaps with a default of 0.75? May allow us to adjust it more easily in the future or allow informed users to adjust based on their needs

Yes - I am thinking that the user should have the option to congure the heuristic somehow, but I'm not sure if it should be this scaling factor. If we decide that the existing heuristic actually makes sense, then this may indeed be the best approach.

@jsignell
Copy link
Member

Should someone be reviewing this again? As usual, I am happy to merge this since it's an isolated feature that people have to opt in to.

@rjzamora
Copy link
Member Author

...that people have to opt in to

Thanks for the nudge here @jsignell - I feel that this is a very nice optimization in the case that a many-partition collection is merged with a "few"-partition collection. However, the current state of the PR will automatically use the broadcast merge if a conservative-but-rough heuristic is met (so the user does not need to opt in).

@jsignell
Copy link
Member

Ah ok. I read it too quickly.

Comment on lines +650 to +661
# Note on `broadcast_bias`:
# We can expect the broadcast merge to be competitive with
# the shuffle merge when the number of partitions in the
# smaller collection is less than the logarithm of the number
# of partitions in the larger collection. By default, we add
# a small preference for the shuffle-based merge by multiplying
# the log result by a 0.5 scaling factor. We call this factor
# the `broadcast_bias`, because a larger number will make Dask
# more likely to select the `broadcast_join` code path. If
# the user specifies a floating-point value for the `broadcast`
# kwarg, that value will be used as the `broadcast_bias`.
if broadcast or (n_small < math.log2(n_big) * broadcast_bias):
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jakirkham - Pinging you here since you made a related comment earlier. I simplified the heuristic and added an option for the user to specify a broadcast_bias (as merge(..., broadcast=<float>)). This should allow users (and/or dask_cudf) to tune the heuristic a bit. Note that it probably makes sense to use a larger bias in dask_cudf - I am trying to be a bit more conservative in dask.dataframe.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should that live in dask.config then or are you still refining this further?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could check dask.config for a broadcast_bias default if the user doesn't specify anything. Is that what you have in mind?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yea exactly. Though I don't have a good intuition about how broadly this parameter may apply to usage patterns. So happy to defer to you on whether a config parameter makes sense

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah - I'm honestly not sure. I'm assuming the best choice depends on specific characteristics of the data as well as the hardware. Therefore, it probably does makes sense to make the bias configurable (though I would expect 99% of users to pass the broadcast arg if default usage wasn't sufficient).

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok if it is specific to the data then yeah a config variable is probably not needed. If we do find it makes sense in the future, we can always add it later

@beckernick
Copy link
Member

@rjzamora is this ready for another review after the two commits 12 days ago?

@rjzamora
Copy link
Member Author

I think this PR is in good shape now (just added support for the npartitions argument). The broadcast-based merge is only used in the case that one collection has many more partitions than the other. My intuition is that we should fine tune the heuristic and/or add a configuration option for the "broadcast bias" in a follow-up PR (if necessary).

@mrocklin
Copy link
Member

mrocklin commented Mar 3, 2021

Merging. From a high level this looks good to me. I also switched broadcast=True and then ran the test suite and things were fine, which gives good confidence of correctness. I agree that tweaking the heuristic might be worthwhile in the future.

I'm excited about this feature and seeing how well it helps in practice. Thank you for your effort @rjzamora and for the careful review @jakirkham

@mrocklin mrocklin merged commit f4cfe0e into dask:master Mar 3, 2021
@rjzamora rjzamora deleted the bcast-merge branch March 3, 2021 22:20
rapids-bot bot pushed a commit to rapidsai/dask-cuda that referenced this pull request Mar 5, 2021
Includes the necessary changes to test [dask#7143](dask/dask#7143).  More specifically, this adds the following options:

- `--base-chunks` : Number of base-DataFrame partitions (default: n_workers)
- `--other-chunks` : Number of other-DataFrame partitions (default: n_workers)
- `--broadcast-join` : Use broadcast join when possible
- `--shuffle-join` : Use shuffle join (takes precedence over '--broadcast-join')

Authors:
  - Richard (Rick) Zamora (@rjzamora)

Approvers:
  - Peter Andreas Entschev (@pentschev)

URL: #507
dcherian added a commit to dcherian/dask that referenced this pull request Mar 8, 2021
* upstream/master: (43 commits)
  bump version to 2021.03.0
  Bump minimum version of distributed (dask#7328)
  Fix `percentiles_summary` with `dask_cudf` (dask#7325)
  Temporarily revert recent Array.__setitem__ updates (dask#7326)
  Blockwise.clone (dask#7312)
  NEP-35 duck array update (dask#7321)
  Don't allow setting `.name` for array (dask#7222)
  Use nearest interpolation for creating percentiles of integer input (dask#7305)
  Test `exp` with CuPy arrays (dask#7322)
  Check that computed chunks have right size and dtype (dask#7277)
  pytest.mark.flaky (dask#7319)
  Contributing docs: add note to pull the latest git tags before pip installing Dask (dask#7308)
  Support for Python 3.9 (dask#7289)
  Add broadcast-based merge implementation (dask#7143)
  Add split_every to graph_manipulation (dask#7282)
  Typo in optimize docs (dask#7306)
  dask.graph_manipulation support for xarray.Dataset (dask#7276)
  Add plot width and height support for Bokeh 2.3.0 (dask#7297)
  Add numpy functions tri, triu_indices, triu_indices_from, tril_indices, tril_indices_from (dask#6997)
  Remove "cleanup" task in dataframe on-disk shuffle. The partd directory (dask#7260)
  ...
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants