Skip to content

map_partitions doesn't broadcast single-partition DataFrames #8338

@gjoseph92

Description

@gjoseph92

In trying to implement #8306 — specifically converting single_partition_join to blockwise — I realized that map_partitions repartitions single-element inputs to align with multi-partition inputs. I'm not sure if this is intended or desired.

The obvious thing to do for single_partition_join is something like:

joined = multi_partition.map_partitions(merge_chunk, single_partition, **kwargs)

hoping that single_partition would be broadcast to every partition in multi_partition. This would match with the behavior of Array.map_blocks (single-chunk inputs are broadcast to every chunk).

The code in question is _maybe_align_partitions, which first does this is_broadcastable check against the inputs:

def _maybe_align_partitions(args):
"""Align DataFrame blocks if divisions are different.
Note that if all divisions are unknown, but have equal npartitions, then
they will be passed through unchanged. This is different than
`align_partitions`, which will fail if divisions aren't all known"""
_is_broadcastable = partial(is_broadcastable, args)
dfs = [df for df in args if isinstance(df, _Frame) and not _is_broadcastable(df)]
if not dfs:
return args
divisions = dfs[0].divisions
if not all(df.divisions == divisions for df in dfs):
dfs2 = iter(align_partitions(*dfs)[0])
return [a if not isinstance(a, _Frame) else next(dfs2) for a in args]
return args

But is_broadcastable isn't as generic as it sounds—it's specifically a check for single-row Series, to support df - df.mean():

dask/dask/dataframe/core.py

Lines 5330 to 5343 in 7960b6e

def is_broadcastable(dfs, s):
"""
This Series is broadcastable against another dataframe in the sequence
"""
return (
isinstance(s, Series)
and s.npartitions == 1
and s.known_divisions
and any(
s.divisions == (df.columns.min(), df.columns.max())
for df in dfs
if isinstance(df, DataFrame)
)
)

This was added 5 years ago in #2085 and hasn't changed since.

I'm wondering what the right behavior should be. On one hand, splitting single-partition inputs to align them as we do now means behavior is consistent with multi-partition inputs. OTOH, this is inconsistent with the corollary function from Array (map_blocks), and inconsistent with the Blockwise concept in general.

Personally, I think broadcasting single-partition DataFrames is both less surprising, and a more powerful API. It's just particularly annoying that there's no way around this right now. But, I'm not sure how much existing map_partitions code is relying on the current behavior.

If we don't want to change behavior, I think we should at least add align_inputs=True as a map_partitions kwarg (just like align_arrays in map_blocks) so you can work around this when necessary.

cc @jcrist @ian-r-rose @rjzamora @jrbourbeau

Metadata

Metadata

Assignees

No one assigned

    Labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions