Enable automatic column projection for groupby aggregations#9442
Enable automatic column projection for groupby aggregations#9442jrbourbeau merged 3 commits intodask:mainfrom
Conversation
jrbourbeau
left a comment
There was a problem hiding this comment.
Thanks @rjzamora! Automating column projection where possible would be great 👍
Co-authored-by: James Bourbeau <jrbourbeau@users.noreply.github.com>
| # Make sure dict-based aggregation specs result in an | ||
| # explicit `getitem` layer to improve column projection | ||
| if isinstance(spec, dict): | ||
| assert hlg_layer(result1.dask, "getitem") |
There was a problem hiding this comment.
I like that this assert is generic, but I do wonder if it's too generic and could still continue to pass if we somehow loose blockwise + getitem fusion in the future. Thoughts on how we might be able to more explicitly test that the column projection we're after is indeed happening?
There was a problem hiding this comment.
This is a good point. I did consider adding another getitem-fusion test, but it seemed to me like we already have decent coverage for the getitem optimization. I decided that the "important" change here is that the getitem layer is created, but I'm open to improving the coverage.
|
And what happens when we group by on the index column @rjzamora ? Thanks ! It seems that this is done :
And it raises a KeyError |
|
Good catch @odovad - Thanks for pointing this out! |
This PR corresponds to the Dask-cudf version of dask/dask#9442, which was found to improve the performance of many groupby-based workflows. After this PR, ```python import dask_cudf path = "/criteo-dataset/day_0.parquet" ddf = dask_cudf.read_parquet(path, split_row_groups=10) # The following takes <2s with this PR, and fails with # an OOM error on main (using a 32GB GPU): ddf.groupby("C1").agg({"C2": "mean"}).compute() ``` Authors: - Richard (Rick) Zamora (https://github.com/rjzamora) Approvers: - GALI PREM SAGAR (https://github.com/galipremsagar) URL: #12124
While looking at the h2o benchmark queries in
coiled-runtime, I noticed that every query includes an explicit column selection operation right before the groupby-aggregation. There is nothing wrong with this - Explicit column selection is GOOD Dask-DataFrame practice. Doing this produces agetitem-basedBlockwiselayer in the high-level graph that can often be projected into the IO Layer at graph-optimization time.The "problem" I see here is that most Dask users are unlikley to know about optimization oportunities like this.
This PR proposes that Dask automatically add explicit column-selection operations for groupby aggregations with
dict-based aggregation specs. This change is relatively simple, but results in significant performance boost for naive user code. For example:This PR:
Wall time: 7.58 smain:Wall time: 18.8 sNote that this PR will effectively change
filtered.groupby('id').agg({'x':'mean'})tofiltered[['id', 'x']].groupby('id').agg({'x':'mean'})pre-commit run --all-files