I don't believe this can be easily triggered by an end user but in case meta is improperly constructed internally (e.g. in dask-expr during optimization) and passed to the extension this can cause severe data loss.
The relevant section is this
|
def _get_output_partition( |
|
self, |
|
partition_id: int, |
|
key: Key, |
|
**kwargs: Any, |
|
) -> pd.DataFrame: |
|
try: |
|
data = self._read_from_disk((partition_id,)) |
|
return convert_shards(data, self.meta) |
|
except KeyError: |
|
return self.meta.copy() |
In the example I am currently debugging, the data here looks like

while meta includes additional columns

convert_shards then raises a KeyError here
|
actual = df[column].dtype |
which causes _get_output_partition to return an empty dataframe. I haven't spent time figuring out why this exception is handled the way it is here but if this logic is required, the bare minimum we have to do is to verify columns of meta and payload data before transmission.
cc @hendrikmakait
I don't believe this can be easily triggered by an end user but in case
metais improperly constructed internally (e.g. in dask-expr during optimization) and passed to the extension this can cause severe data loss.The relevant section is this
distributed/distributed/shuffle/_shuffle.py
Lines 521 to 531 in a5a6e99
In the example I am currently debugging, the
datahere looks likewhile
metaincludes additional columnsconvert_shardsthen raises aKeyErrorheredistributed/distributed/shuffle/_arrow.py
Line 78 in b03efee
which causes
_get_output_partitionto return an empty dataframe. I haven't spent time figuring out why this exception is handled the way it is here but if this logic is required, the bare minimum we have to do is to verify columns of meta and payload data before transmission.cc @hendrikmakait