-
-
Notifications
You must be signed in to change notification settings - Fork 1.9k
Description
In trying to implement #8306 — specifically converting single_partition_join to blockwise — I realized that map_partitions repartitions single-element inputs to align with multi-partition inputs. I'm not sure if this is intended or desired.
The obvious thing to do for single_partition_join is something like:
joined = multi_partition.map_partitions(merge_chunk, single_partition, **kwargs)hoping that single_partition would be broadcast to every partition in multi_partition. This would match with the behavior of Array.map_blocks (single-chunk inputs are broadcast to every chunk).
The code in question is _maybe_align_partitions, which first does this is_broadcastable check against the inputs:
Lines 155 to 170 in 7960b6e
| def _maybe_align_partitions(args): | |
| """Align DataFrame blocks if divisions are different. | |
| Note that if all divisions are unknown, but have equal npartitions, then | |
| they will be passed through unchanged. This is different than | |
| `align_partitions`, which will fail if divisions aren't all known""" | |
| _is_broadcastable = partial(is_broadcastable, args) | |
| dfs = [df for df in args if isinstance(df, _Frame) and not _is_broadcastable(df)] | |
| if not dfs: | |
| return args | |
| divisions = dfs[0].divisions | |
| if not all(df.divisions == divisions for df in dfs): | |
| dfs2 = iter(align_partitions(*dfs)[0]) | |
| return [a if not isinstance(a, _Frame) else next(dfs2) for a in args] | |
| return args |
But is_broadcastable isn't as generic as it sounds—it's specifically a check for single-row Series, to support df - df.mean():
Lines 5330 to 5343 in 7960b6e
| def is_broadcastable(dfs, s): | |
| """ | |
| This Series is broadcastable against another dataframe in the sequence | |
| """ | |
| return ( | |
| isinstance(s, Series) | |
| and s.npartitions == 1 | |
| and s.known_divisions | |
| and any( | |
| s.divisions == (df.columns.min(), df.columns.max()) | |
| for df in dfs | |
| if isinstance(df, DataFrame) | |
| ) | |
| ) |
This was added 5 years ago in #2085 and hasn't changed since.
I'm wondering what the right behavior should be. On one hand, splitting single-partition inputs to align them as we do now means behavior is consistent with multi-partition inputs. OTOH, this is inconsistent with the corollary function from Array (map_blocks), and inconsistent with the Blockwise concept in general.
Personally, I think broadcasting single-partition DataFrames is both less surprising, and a more powerful API. It's just particularly annoying that there's no way around this right now. But, I'm not sure how much existing map_partitions code is relying on the current behavior.
If we don't want to change behavior, I think we should at least add align_inputs=True as a map_partitions kwarg (just like align_arrays in map_blocks) so you can work around this when necessary.