-
-
Notifications
You must be signed in to change notification settings - Fork 758
P2P shuffle silently dropping data if provided meta includes columns that do not exist in data #8519
Description
I don't believe this can be easily triggered by an end user but in case meta is improperly constructed internally (e.g. in dask-expr during optimization) and passed to the extension this can cause severe data loss.
The relevant section is this
distributed/distributed/shuffle/_shuffle.py
Lines 521 to 531 in a5a6e99
| def _get_output_partition( | |
| self, | |
| partition_id: int, | |
| key: Key, | |
| **kwargs: Any, | |
| ) -> pd.DataFrame: | |
| try: | |
| data = self._read_from_disk((partition_id,)) | |
| return convert_shards(data, self.meta) | |
| except KeyError: | |
| return self.meta.copy() |
In the example I am currently debugging, the data here looks like
while meta includes additional columns
convert_shards then raises a KeyError here
distributed/distributed/shuffle/_arrow.py
Line 78 in b03efee
| actual = df[column].dtype |
which causes _get_output_partition to return an empty dataframe. I haven't spent time figuring out why this exception is handled the way it is here but if this logic is required, the bare minimum we have to do is to verify columns of meta and payload data before transmission.

