Skip to content

Data duplication in P2P shuffling #7324

@fjetter

Description

@fjetter

The current P2P implementation is vulnerable to data duplication if the transfer tasks are ever executed twice.

We do not have any exactly-once execution guarantees and are not eager to implement any, see also #6378 for a brief discussion.

Particularly in the case of worker failures it may be possible for a transfer task to be executed (even successfully) without the scheduler even learning of it.

This specific case could be easily dealt with by restarting/failing an ongoing shuffle if input workers are leaving the shuffle while it is running (right now, we only require a failure if output workers are leaving, alternatively we could fix input == output workers).

While I believe there are no other cases where a task would be executed twice, I am not certain if we are able to guarantee this forever.

Instead, I propose to deal with this by implementing data deduplication on receiving side. Every shard can be uniquely identified by the tuple (input_partition_id, output_partition_id).
The current implementation would allow easy deduplication by this key by keeping a record of the already received shards.

This might have a slightly negative impact on transfer performance and might limit our flexibility moving forward in terms of batching. I do not consider these limitations blockers.

cc @mrocklin, @hendrikmakait

Metadata

Metadata

Assignees

Labels

Type

No type

Projects

No projects

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions