Skip to content

[core][cgraph] Introduce fault-tolerant PushMutableObject#58866

Open
ruisearch42 wants to merge 12 commits intoray-project:masterfrom
ruisearch42:ft_push
Open

[core][cgraph] Introduce fault-tolerant PushMutableObject#58866
ruisearch42 wants to merge 12 commits intoray-project:masterfrom
ruisearch42:ft_push

Conversation

@ruisearch42
Copy link
Copy Markdown
Contributor

@ruisearch42 ruisearch42 commented Nov 21, 2025

Description

Currently PushMutableObject() does not retry and HandlePushMutableObject() does not handle retry or out-of-order messages. This PR adds these support to be fault tolerant to network jitters.

Approach -- Chunk-level, version-aware retry

  1. Version tracking: Each write epoch has a version from PlasmaObjectHeader.version. The receiver tracks highest_completed_version_ per_object.
  2. Classifying versions in HandlePushMutableObjects
Incoming Version Classification Action
≤ highest_completed Stale Immediately reply done=true, discard
= highest_completed + 1 Active Write to backing store
> highest_completed + 1 Future Buffer the chunk key, reply done=false
  1. Chunk-level idempotency: Duplicate chunks are detected via received_chunks set hashed by (offset, version) without re-writing.
  2. Sender-side retry: Each chunk RPC uses INVOKE_RETRYABLE_RPC_CALL. The callback fires exactly once either when done=true is received or on the first failure.
  3. Completion: When written_so_far == total_data_size for the active version, metadata is copied, WriteRelease is called, and highest_completed_version_ is updated.

Related issues

Fixes #58426

Additional information

Optional: Add implementation details, API changes, usage examples, screenshots, etc.

@ruisearch42 ruisearch42 changed the title [cgraph] Introduce fault-tolerant PushMutableObject [core][cgraph] Introduce fault-tolerant PushMutableObject Nov 21, 2025
@ruisearch42 ruisearch42 added the go add ONLY when ready to merge, run all tests label Nov 21, 2025
Signed-off-by: Rui Qiao <ruisearch42@gmail.com>
Signed-off-by: Rui Qiao <ruisearch42@gmail.com>
Signed-off-by: Rui Qiao <ruisearch42@gmail.com>
ruisearch42 and others added 4 commits December 5, 2025 00:51
Signed-off-by: Rui Qiao <ruisearch42@gmail.com>
Signed-off-by: Rui Qiao <ruisearch42@gmail.com>
Signed-off-by: Rui Qiao <ruisearch42@gmail.com>
@ruisearch42 ruisearch42 marked this pull request as ready for review December 18, 2025 21:58
@ruisearch42 ruisearch42 requested a review from a team as a code owner December 18, 2025 21:58
Signed-off-by: Rui Qiao <ruisearch42@gmail.com>
@ruisearch42
Copy link
Copy Markdown
Contributor Author

Hey @dayshah , could you help review this PR? thanks!

Signed-off-by: Rui Qiao <ruisearch42@gmail.com>
Signed-off-by: Rui Qiao <ruisearch42@gmail.com>
@ray-gardener ray-gardener bot added the core Issues that should be addressed in Ray Core label Dec 19, 2025
Signed-off-by: Rui Qiao <ruisearch42@gmail.com>
Signed-off-by: Rui Qiao <ruisearch42@gmail.com>
received_chunks.insert(chunk_key);
reply->set_done(false);
return;
}
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Future version chunks tracked but data never written

When a future version chunk arrives (version > active version), the code inserts the chunk_key into received_chunks_ but returns immediately without writing the actual data to the backing store. Later, when that version becomes active and retries arrive for those chunks, they're found in received_chunks_ and treated as duplicates, returning early without writing data. The comment says "buffer for later processing" but only the chunk key is tracked—the actual data is discarded. This causes data loss for any chunks that arrive before their version becomes active.

Additional Locations (1)

Fix in Cursor Fix in Web

Copy link
Copy Markdown
Contributor

@jeffreywang-anyscale jeffreywang-anyscale Feb 22, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can simply remove received_chunks.insert(chunk_key) and let the client retry.


// Step 2: Determine if this is active version or future version
int64_t active_version = highest_completed + 1;
bool is_active_version = (request_version == active_version);
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TOCTOU race allows stale retries to corrupt state

A time-of-check-time-of-use race exists between the two lock acquisitions. The code reads highest_completed and performs the stale version check in the first critical section (lines 159-167), then releases the lock and computes is_active_version outside the lock (lines 169-171). When the second lock is acquired (line 177), the stale is_active_version is used without re-validation. If another thread completes the version between the two lock acquisitions, a stale retry can bypass the staleness check, find an empty received_chunks_ (cleaned up by the completing thread), and trigger a new WriteAcquire for an already-completed version. This can leave the object in an inconsistent state since only one chunk would be written.

Additional Locations (1)

Fix in Cursor Fix in Web

@daiping8
Copy link
Copy Markdown
Contributor

Great fix. May I ask how long it will take for this PR to be merged?

Copy link
Copy Markdown
Contributor

@dayshah dayshah left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you write a pr description explaining the logic for making it fault-tolerant, it's p complicated...

Also I'm not sure if this is really necessary, I doubt messages are dropped regularly at all, esp. for compiled graphs where nodes aren't expected to be across az's, and in the case they are dropped it might be ok to scrap the whole transfer on both sides and start from scratch. This is what the regular object manager push/pull does on failures, and I don't really want to have differing fault tolerance logic for each one.


Status MutableObjectManager::ReadAcquire(const ObjectID &object_id,
std::shared_ptr<RayObject> &result,
int64_t &version_read,
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you return this out with the status with StatusOr instead of an out param


namespace ray {
namespace rpc {

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We have RAY_testing_rpc_failure which can inject failures for you and then can test functionality from python with the injected failures.

const ray::rpc::ClientCallback<ray::rpc::PushMutableObjectReply> &callback) {
int64_t version,
const ray::rpc::ClientCallback<ray::rpc::PushMutableObjectReply> &callback,
int64_t timeout_ms) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why adding timeout as a param here if it's effectively hardcoded to -1 anyways

@ruisearch42
Copy link
Copy Markdown
Contributor Author

Great fix. May I ask how long it will take for this PR to be merged?

Hey I was on vacation. I will continue with the change and hopefully we can get this in soon!

@sdtblckgov
Copy link
Copy Markdown

Hi Rui, any updates on this? Also encountering the same error. Happy to try and test out your branch if that would be helpful, would I need to build from source?

@daiping8
Copy link
Copy Markdown
Contributor

Hi Rui, any updates on this? Also encountering the same error. Happy to try and test out your branch if that would be helpful, would I need to build from source?

Yes. Need to compile from source code, with changes in the C++ code.

@EthanAndersonUSA
Copy link
Copy Markdown

Any change this will be fixed? I an encountering the same issue.

@github-actions
Copy link
Copy Markdown

This pull request has been automatically marked as stale because it has not had
any activity for 14 days. It will be closed in another 14 days if no further activity occurs.
Thank you for your contributions.

You can always ask for help on our discussion forum or Ray's public slack channel.

If you'd like to keep this open, just leave any comment, and the stale label will be removed.

@github-actions github-actions bot added the stale The issue is stale. It will be closed within 7 days unless there are further conversation label Feb 12, 2026
@EthanAndersonUSA
Copy link
Copy Markdown

The fix from the branch seems to be fixing this issue. I am running vLLM succesfully on 2 nodes with 4x RTX 5090 each.

@github-actions github-actions bot added unstale A PR that has been marked unstale. It will not get marked stale again if this label is on it. and removed stale The issue is stale. It will be closed within 7 days unless there are further conversation labels Feb 12, 2026
@jeffreywang-anyscale jeffreywang-anyscale self-assigned this Feb 20, 2026
@Kelang-Tian
Copy link
Copy Markdown

This work nicely fixed the problem I was experiencing. When can it be merged into the main branch?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

core Issues that should be addressed in Ray Core go add ONLY when ready to merge, run all tests unstale A PR that has been marked unstale. It will not get marked stale again if this label is on it.

Projects

None yet

Development

Successfully merging this pull request may close these issues.

[Compiled Graph] Unstable Network Conditions Cause Hang with vLLM v1 API in Cross-Node Pipeline Parallelism

8 participants