Skip to content

Enable automatic column projection for groupby aggregations#9442

Merged
jrbourbeau merged 3 commits intodask:mainfrom
rjzamora:groupby-getitem
Aug 30, 2022
Merged

Enable automatic column projection for groupby aggregations#9442
jrbourbeau merged 3 commits intodask:mainfrom
rjzamora:groupby-getitem

Conversation

@rjzamora
Copy link
Copy Markdown
Member

@rjzamora rjzamora commented Aug 30, 2022

While looking at the h2o benchmark queries in coiled-runtime, I noticed that every query includes an explicit column selection operation right before the groupby-aggregation. There is nothing wrong with this - Explicit column selection is GOOD Dask-DataFrame practice. Doing this produces a getitem-based Blockwise layer in the high-level graph that can often be projected into the IO Layer at graph-optimization time.

The "problem" I see here is that most Dask users are unlikley to know about optimization oportunities like this.

This PR proposes that Dask automatically add explicit column-selection operations for groupby aggregations with dict-based aggregation specs. This change is relatively simple, but results in significant performance boost for naive user code. For example:

import dask.dataframe as dd
from dask.datasets import timeseries

ddf = timeseries(end='2003-01-31')
filtered = ddf[ddf['x'] > 0.5]
aggregated = filtered.groupby('id').agg({'x':'mean'})

%time aggregated.compute()

This PR: Wall time: 7.58 s
main: Wall time: 18.8 s

Note that this PR will effectively change filtered.groupby('id').agg({'x':'mean'}) to filtered[['id', 'x']].groupby('id').agg({'x':'mean'})

  • Tests added / passed
  • Passes pre-commit run --all-files

Copy link
Copy Markdown
Member

@jrbourbeau jrbourbeau left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @rjzamora! Automating column projection where possible would be great 👍

Comment thread dask/dataframe/groupby.py Outdated
@jrbourbeau jrbourbeau mentioned this pull request Aug 30, 2022
5 tasks
Co-authored-by: James Bourbeau <jrbourbeau@users.noreply.github.com>
@jrbourbeau jrbourbeau changed the title [Optimization] Enable column projection for groupby aggregations Enable automatic column projection for groupby aggregations Aug 30, 2022
Copy link
Copy Markdown
Member

@jrbourbeau jrbourbeau left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @rjzamora!

Comment on lines +1084 to +1087
# Make sure dict-based aggregation specs result in an
# explicit `getitem` layer to improve column projection
if isinstance(spec, dict):
assert hlg_layer(result1.dask, "getitem")
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like that this assert is generic, but I do wonder if it's too generic and could still continue to pass if we somehow loose blockwise + getitem fusion in the future. Thoughts on how we might be able to more explicitly test that the column projection we're after is indeed happening?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a good point. I did consider adding another getitem-fusion test, but it seemed to me like we already have decent coverage for the getitem optimization. I decided that the "important" change here is that the getitem layer is created, but I'm open to improving the coverage.

@jrbourbeau jrbourbeau merged commit 19a5147 into dask:main Aug 30, 2022
@rjzamora rjzamora deleted the groupby-getitem branch August 30, 2022 21:50
@odovad
Copy link
Copy Markdown

odovad commented Nov 10, 2022

And what happens when we group by on the index column @rjzamora ? Thanks !

It seems that this is done :

_obj = self.obj[list(column_projection)]

And it raises a KeyError

@rjzamora
Copy link
Copy Markdown
Member Author

Good catch @odovad - Thanks for pointing this out!

rapids-bot Bot pushed a commit to rapidsai/cudf that referenced this pull request Nov 14, 2022
This PR corresponds to the Dask-cudf version of dask/dask#9442, which was found to improve the performance of many groupby-based workflows. After this PR, 

```python
import dask_cudf

path = "/criteo-dataset/day_0.parquet"
ddf = dask_cudf.read_parquet(path, split_row_groups=10)

# The following takes <2s with this PR, and fails with
# an OOM error on main (using a 32GB GPU):
ddf.groupby("C1").agg({"C2": "mean"}).compute()
```

Authors:
  - Richard (Rick) Zamora (https://github.com/rjzamora)

Approvers:
  - GALI PREM SAGAR (https://github.com/galipremsagar)

URL: #12124
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants