Skip to content

P2P shuffle returns incorrect results if errors occur receiving data (also asyncio tasks are leaked) #6277

@gjoseph92

Description

@gjoseph92

If an error occurs deserializing, regrouping, serializing, or writing data to disk, and the shuffle isn't in "backpressure mode", that data will simply be lost and the shuffle will still succeed

shuffle = await self._get_shuffle(shuffle_id)
task = asyncio.create_task(shuffle.receive(data))
if (
shuffle.multi_file.total_size + sum(map(len, data))
> shuffle.multi_file.memory_limit
):
await task # backpressure

When task isn't awaited, the asyncio task is leaked. Any error that occurred in it is also lost (besides being logged).

@graingert and I tried to fix this, but we couldn't get something working:

  • The dumb way (keep leaking tasks, wrap all of receive in a try/except, set self._exception, raise self._exception in add_partition, get_output_partition, inputs_done) fails tests because of leaking tasks
  • The "proper" asyncio way causes a CancelledError to pop out in some unexpected place and seems to shut down the whole worker?

xref #6201

Metadata

Metadata

Assignees

No one assigned

    Labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions