[core][cgraph] Introduce fault-tolerant PushMutableObject#58866
[core][cgraph] Introduce fault-tolerant PushMutableObject#58866ruisearch42 wants to merge 12 commits intoray-project:masterfrom
Conversation
Signed-off-by: Rui Qiao <ruisearch42@gmail.com>
|
Hey @dayshah , could you help review this PR? thanks! |
Signed-off-by: Rui Qiao <ruisearch42@gmail.com>
| received_chunks.insert(chunk_key); | ||
| reply->set_done(false); | ||
| return; | ||
| } |
There was a problem hiding this comment.
Future version chunks tracked but data never written
When a future version chunk arrives (version > active version), the code inserts the chunk_key into received_chunks_ but returns immediately without writing the actual data to the backing store. Later, when that version becomes active and retries arrive for those chunks, they're found in received_chunks_ and treated as duplicates, returning early without writing data. The comment says "buffer for later processing" but only the chunk key is tracked—the actual data is discarded. This causes data loss for any chunks that arrive before their version becomes active.
Additional Locations (1)
There was a problem hiding this comment.
We can simply remove received_chunks.insert(chunk_key) and let the client retry.
|
|
||
| // Step 2: Determine if this is active version or future version | ||
| int64_t active_version = highest_completed + 1; | ||
| bool is_active_version = (request_version == active_version); |
There was a problem hiding this comment.
TOCTOU race allows stale retries to corrupt state
A time-of-check-time-of-use race exists between the two lock acquisitions. The code reads highest_completed and performs the stale version check in the first critical section (lines 159-167), then releases the lock and computes is_active_version outside the lock (lines 169-171). When the second lock is acquired (line 177), the stale is_active_version is used without re-validation. If another thread completes the version between the two lock acquisitions, a stale retry can bypass the staleness check, find an empty received_chunks_ (cleaned up by the completing thread), and trigger a new WriteAcquire for an already-completed version. This can leave the object in an inconsistent state since only one chunk would be written.
Additional Locations (1)
|
Great fix. May I ask how long it will take for this PR to be merged? |
dayshah
left a comment
There was a problem hiding this comment.
Can you write a pr description explaining the logic for making it fault-tolerant, it's p complicated...
Also I'm not sure if this is really necessary, I doubt messages are dropped regularly at all, esp. for compiled graphs where nodes aren't expected to be across az's, and in the case they are dropped it might be ok to scrap the whole transfer on both sides and start from scratch. This is what the regular object manager push/pull does on failures, and I don't really want to have differing fault tolerance logic for each one.
|
|
||
| Status MutableObjectManager::ReadAcquire(const ObjectID &object_id, | ||
| std::shared_ptr<RayObject> &result, | ||
| int64_t &version_read, |
There was a problem hiding this comment.
can you return this out with the status with StatusOr instead of an out param
|
|
||
| namespace ray { | ||
| namespace rpc { | ||
|
|
There was a problem hiding this comment.
We have RAY_testing_rpc_failure which can inject failures for you and then can test functionality from python with the injected failures.
| const ray::rpc::ClientCallback<ray::rpc::PushMutableObjectReply> &callback) { | ||
| int64_t version, | ||
| const ray::rpc::ClientCallback<ray::rpc::PushMutableObjectReply> &callback, | ||
| int64_t timeout_ms) { |
There was a problem hiding this comment.
why adding timeout as a param here if it's effectively hardcoded to -1 anyways
Hey I was on vacation. I will continue with the change and hopefully we can get this in soon! |
|
Hi Rui, any updates on this? Also encountering the same error. Happy to try and test out your branch if that would be helpful, would I need to build from source? |
Yes. Need to compile from source code, with changes in the C++ code. |
|
Any change this will be fixed? I an encountering the same issue. |
|
This pull request has been automatically marked as stale because it has not had You can always ask for help on our discussion forum or Ray's public slack channel. If you'd like to keep this open, just leave any comment, and the stale label will be removed. |
|
The fix from the branch seems to be fixing this issue. I am running vLLM succesfully on 2 nodes with 4x RTX 5090 each. |
|
This work nicely fixed the problem I was experiencing. When can it be merged into the main branch? |
Description
Currently
PushMutableObject()does not retry andHandlePushMutableObject()does not handle retry or out-of-order messages. This PR adds these support to be fault tolerant to network jitters.Approach -- Chunk-level, version-aware retry
PlasmaObjectHeader.version. The receiver trackshighest_completed_version_per_object.HandlePushMutableObjectsdone=true, discarddone=falsereceived_chunksset hashed by(offset, version)without re-writing.INVOKE_RETRYABLE_RPC_CALL. The callback fires exactly once either whendone=trueis received or on the first failure.written_so_far == total_data_sizefor the active version, metadata is copied,WriteReleaseis called, andhighest_completed_version_is updated.Related issues
Fixes #58426
Additional information