Skip to content

Implement shuffle-based-groupby for simpler aggregations #9487

@ian-r-rose

Description

@ian-r-rose

In #9406 @rjzamora did some nice work in implementing a shuffle-based groupby/agg operation for dask dataframe. However, it is currently only implemented for groupby(...).agg({...}), and some of the simpler aggregation functions which might benefit from it are un-implemented. This includes groupby(...).min, `groupby(...).sum and similar:

dask/dask/dataframe/groupby.py

Lines 1239 to 1250 in 024df34

def _aca_agg(
self,
token,
func,
aggfunc=None,
meta=None,
split_every=None,
split_out=1,
chunk_kwargs=None,
aggregate_kwargs=None,
):
if aggfunc is None:

So, the following snippet would use a shuffle-based approach:

import dask.datasets

ddf = dask.datasets.timeseries()
ddf.groupby("id").agg({"x": "mean"}, split_out=2)

While the following semantically identical one would not:

import dask.datasets

ddf = dask.datasets.timeseries()
ddf.groupby("id").x.mean(split_out=2)

It should be relatively straightforward to retrofit the simpler aggregate functions to also take advantage of the new shuffle-base approach where appropriate.

Metadata

Metadata

Assignees

Labels

Type

No type

Projects

No projects

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions