-
-
Notifications
You must be signed in to change notification settings - Fork 757
Closed
Labels
Description
If an error occurs deserializing, regrouping, serializing, or writing data to disk, and the shuffle isn't in "backpressure mode", that data will simply be lost and the shuffle will still succeed
distributed/distributed/shuffle/shuffle_extension.py
Lines 252 to 258 in 7bd6442
| shuffle = await self._get_shuffle(shuffle_id) | |
| task = asyncio.create_task(shuffle.receive(data)) | |
| if ( | |
| shuffle.multi_file.total_size + sum(map(len, data)) | |
| > shuffle.multi_file.memory_limit | |
| ): | |
| await task # backpressure |
When task isn't awaited, the asyncio task is leaked. Any error that occurred in it is also lost (besides being logged).
@graingert and I tried to fix this, but we couldn't get something working:
- The dumb way (keep leaking tasks, wrap all of
receivein a try/except, setself._exception, raiseself._exceptioninadd_partition,get_output_partition,inputs_done) fails tests because of leaking tasks - The "proper" asyncio way causes a CancelledError to pop out in some unexpected place and seems to shut down the whole worker?
xref #6201
Reactions are currently unavailable