-
-
Notifications
You must be signed in to change notification settings - Fork 1.9k
Description
When passing an auxiliary dask DataFrame to map_partitions, its chunks are aligned to the main DataFrame and the function receives one chunk of each per task. If you give the same input as a kwarg, the function receives the entire DataFrame concatenated into one.
It's somewhat unclear from the docstring if this is expected behavior:
Arguments and keywords to pass to the function. At least one of the
args should be a Dask.dataframe. Arguments and keywords may contain
``Scalar``, ``Delayed`` or regular python objects. DataFrame-like args
(both dask and pandas) will be repartitioned to align (if necessary)
before applying the function.
but either way it's a bit unintuitive that using a positional vs a kwarg would change behavior like this.
Minimal Complete Verifiable Example:
import dask
df = dask.datasets.timeseries()
df2 = dask.datasets.timeseries()
def x(p, other):
assert len(p) == len(other), f"{len(p)=}, {len(other)=}"
return p.x - other.x
# Works, because `df2` given positionally
df.map_partitions(x, df2).compute()
# Fails, because `df2` given as kwarg
df.map_partitions(x, other=df2).compute()
# AssertionError: len(p)=86400, len(other)=2592000The problem is likely that map_partitions finalizes (via unpack_collections) everything in kwargs. Additionally, partitionwise_graph has lots of unpacking logic for args to format them into proper blockwise (name, indices) pairs, but kwargs are passed through straight to blockwise to be treated as 0D objects.
Anything else we need to know?:
I would be okay keeping this behavior, but it should be documented much more clearly. Possibly even should raise a warning if a multi-chunk DataFrame/Array is given as a kwarg.
Environment:
- Dask version: 824244c
- Python version: 3.9.1
- Operating System: macOS
- Install method (conda, pip, source): source