Skip to content

Optimized groupby aggregations when grouping by a sorted index #8361

@gjoseph92

Description

@gjoseph92

Similar request to #2999

When grouping by the index, and the index has known divisions, most aggregations could be a simple map_partitions. Since each partition already contains all the values for an output group[^1], there's no need to exchange data between partitions.

However in these cases, we still do apply_concat_apply, _cum_agg, etc. and generate complex graphs that involve a lot of data transfer.

In normal pandas use, I'm not sure how common it is to groupby the index versus a column. However, in dask, using the known divisions of the index is highly recommended (see best practices) and something users should try to do, especially with large datasets. In particular, once shuffling performs better (xref dask/distributed#5435), a pattern of doing one set_index up front (or using a partitioned data format like Parquet) and then many fast operations on that should be effective.

I'd want something like this to involve minimal data transfer after the set_index step:

import dask.dataframe as dd
df = dd.read_parquet(...)
# a savvy user recognizes this is worthwhile since there are multiple date ops to do next
df_by_day = df.set_index("date")

daily_users = df_by_day.groupby(["date", "user_id"]).count()
daily_sales = df_by_day.groupby(["date", "sale_amt"]).sum()

daily_summary = daily_counts.merge(daily_sales, on="date")
# ^ this should be fast since both groupbys have retained their `divisions`

Someday, it might be nice if users didn't even have to do the set_index, and we had an optimization that could recognize that multiple groupbys would benefit from a pre-shuffle and insert one automatically. However, that's a hard optimization to implement (might require HLEs #7933) and a ways off. Getting users to understand that they should use set_index more carefully than in pandas, and its importance as a performance tool, seems easier. As we do that, let's make sure we're taking as much advantage of it as possible.

[^1] When all the rows in a partition have the same index value, then you do need to combine partitions. For example: in divisions=[0, 1, 2, 2, 4, 5], the partitions containing 1-2, 2-2, and 2-4 would need to be combined, probably using the normal apply_concat_apply logic. However, since we know the divisions, we can be more selective about where we do this and reduce some transfer. With well-balanced partitions, this should be a relatively rare case, and there usually shouldn't be more than a handful of consecutive partitions with the same value.

Metadata

Metadata

Assignees

Labels

dataframeneeds attentionIt's been a while since this was pushed on. Needs attention from the owner or a maintainer.

Type

No type

Projects

No projects

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions