Broadcast-based Merge implementation#7143
Conversation
mrocklin
left a comment
There was a problem hiding this comment.
Tiny comments!
Please don't block on any of these. The probability that github conversations get lost in my inbox these days is high.
dask/dataframe/multi.py
Outdated
| """ | ||
| ntasks_shuffle = n_r * math.log2(n_r) + n_l * math.log2(n_l) | ||
| ntasks_bcast = n_r * n_l + n_l | ||
| return ntasks_bcast < ntasks_shuffle * 0.75 |
There was a problem hiding this comment.
I have a slight preference to include this logic directly at the call site if you're ok with that. Rationale here
There was a problem hiding this comment.
I'm curious, why the 0.75 factor?
There was a problem hiding this comment.
I have a slight preference to include this logic directly at the call site if you're ok with that. Rationale here
Sound good to me.
I'm curious, why the 0.75 factor?
Just a rough scaling factor (I noticed it was best to prefer "shuffle" when the heuristic calculation was "close"). The heuristic is absolutely something I need to revisit/test :)
There was a problem hiding this comment.
Should this factor be configurable perhaps with a default of 0.75? May allow us to adjust it more easily in the future or allow informed users to adjust based on their needs
There was a problem hiding this comment.
Should this factor be configurable perhaps with a default of 0.75? May allow us to adjust it more easily in the future or allow informed users to adjust based on their needs
Yes - I am thinking that the user should have the option to congure the heuristic somehow, but I'm not sure if it should be this scaling factor. If we decide that the existing heuristic actually makes sense, then this may indeed be the best approach.
|
Should someone be reviewing this again? As usual, I am happy to merge this since it's an isolated feature that people have to opt in to. |
Thanks for the nudge here @jsignell - I feel that this is a very nice optimization in the case that a many-partition collection is merged with a "few"-partition collection. However, the current state of the PR will automatically use the broadcast merge if a conservative-but-rough heuristic is met (so the user does not need to opt in). |
|
Ah ok. I read it too quickly. |
| # Note on `broadcast_bias`: | ||
| # We can expect the broadcast merge to be competitive with | ||
| # the shuffle merge when the number of partitions in the | ||
| # smaller collection is less than the logarithm of the number | ||
| # of partitions in the larger collection. By default, we add | ||
| # a small preference for the shuffle-based merge by multiplying | ||
| # the log result by a 0.5 scaling factor. We call this factor | ||
| # the `broadcast_bias`, because a larger number will make Dask | ||
| # more likely to select the `broadcast_join` code path. If | ||
| # the user specifies a floating-point value for the `broadcast` | ||
| # kwarg, that value will be used as the `broadcast_bias`. | ||
| if broadcast or (n_small < math.log2(n_big) * broadcast_bias): |
There was a problem hiding this comment.
@jakirkham - Pinging you here since you made a related comment earlier. I simplified the heuristic and added an option for the user to specify a broadcast_bias (as merge(..., broadcast=<float>)). This should allow users (and/or dask_cudf) to tune the heuristic a bit. Note that it probably makes sense to use a larger bias in dask_cudf - I am trying to be a bit more conservative in dask.dataframe.
There was a problem hiding this comment.
Should that live in dask.config then or are you still refining this further?
There was a problem hiding this comment.
We could check dask.config for a broadcast_bias default if the user doesn't specify anything. Is that what you have in mind?
There was a problem hiding this comment.
Yea exactly. Though I don't have a good intuition about how broadly this parameter may apply to usage patterns. So happy to defer to you on whether a config parameter makes sense
There was a problem hiding this comment.
Yeah - I'm honestly not sure. I'm assuming the best choice depends on specific characteristics of the data as well as the hardware. Therefore, it probably does makes sense to make the bias configurable (though I would expect 99% of users to pass the broadcast arg if default usage wasn't sufficient).
There was a problem hiding this comment.
Ok if it is specific to the data then yeah a config variable is probably not needed. If we do find it makes sense in the future, we can always add it later
|
@rjzamora is this ready for another review after the two commits 12 days ago? |
|
I think this PR is in good shape now (just added support for the |
|
Merging. From a high level this looks good to me. I also switched I'm excited about this feature and seeing how well it helps in practice. Thank you for your effort @rjzamora and for the careful review @jakirkham |
Includes the necessary changes to test [dask#7143](dask/dask#7143). More specifically, this adds the following options: - `--base-chunks` : Number of base-DataFrame partitions (default: n_workers) - `--other-chunks` : Number of other-DataFrame partitions (default: n_workers) - `--broadcast-join` : Use broadcast join when possible - `--shuffle-join` : Use shuffle join (takes precedence over '--broadcast-join') Authors: - Richard (Rick) Zamora (@rjzamora) Approvers: - Peter Andreas Entschev (@pentschev) URL: #507
* upstream/master: (43 commits) bump version to 2021.03.0 Bump minimum version of distributed (dask#7328) Fix `percentiles_summary` with `dask_cudf` (dask#7325) Temporarily revert recent Array.__setitem__ updates (dask#7326) Blockwise.clone (dask#7312) NEP-35 duck array update (dask#7321) Don't allow setting `.name` for array (dask#7222) Use nearest interpolation for creating percentiles of integer input (dask#7305) Test `exp` with CuPy arrays (dask#7322) Check that computed chunks have right size and dtype (dask#7277) pytest.mark.flaky (dask#7319) Contributing docs: add note to pull the latest git tags before pip installing Dask (dask#7308) Support for Python 3.9 (dask#7289) Add broadcast-based merge implementation (dask#7143) Add split_every to graph_manipulation (dask#7282) Typo in optimize docs (dask#7306) dask.graph_manipulation support for xarray.Dataset (dask#7276) Add plot width and height support for Bokeh 2.3.0 (dask#7297) Add numpy functions tri, triu_indices, triu_indices_from, tril_indices, tril_indices_from (dask#6997) Remove "cleanup" task in dataframe on-disk shuffle. The partd directory (dask#7260) ...
Dask-DataFrame will currently perform a shuffle-based merge for all cases where there are multiple partitions in both collections (and when the join is not on an index with known divisions). The shuffle-based approach makes a lot of sense when both of the collections comprise many partitions, but makes less sense when one of the collections is huge (making the shuffle expensive) and the other is very small (only a few partitions). This PR introduces a
bcast_joinfor this latter scenario.NOTE: There is still more work to do here, but preliminary tests suggest that the broadcast algorithm is indeed faster than a broadcast merge when a 240-partition DataFrame is joined with a DataFrame comprising 2-4 partitions (with the benefit increasing as the large DataFrame becomes larger)... I will share benchmarks here as the remaining tasks are completed.
TODO:
how="right"npartitionskwarg (not sure this "should" be supported?)dask.dataframeanddask_cudfcc @beckernick