-
-
Notifications
You must be signed in to change notification settings - Fork 757
Description
The current P2P implementation is vulnerable to data duplication if the transfer tasks are ever executed twice.
We do not have any exactly-once execution guarantees and are not eager to implement any, see also #6378 for a brief discussion.
Particularly in the case of worker failures it may be possible for a transfer task to be executed (even successfully) without the scheduler even learning of it.
This specific case could be easily dealt with by restarting/failing an ongoing shuffle if input workers are leaving the shuffle while it is running (right now, we only require a failure if output workers are leaving, alternatively we could fix input == output workers).
While I believe there are no other cases where a task would be executed twice, I am not certain if we are able to guarantee this forever.
Instead, I propose to deal with this by implementing data deduplication on receiving side. Every shard can be uniquely identified by the tuple (input_partition_id, output_partition_id).
The current implementation would allow easy deduplication by this key by keeping a record of the already received shards.
This might have a slightly negative impact on transfer performance and might limit our flexibility moving forward in terms of batching. I do not consider these limitations blockers.