Skip to content

[core] Avoid resubmitted actor tasks from hanging indefinitely#51904

Merged
jjyao merged 25 commits intoray-project:masterfrom
kevin85421:20250329-devbox1-tmux4-ray
Apr 8, 2025
Merged

[core] Avoid resubmitted actor tasks from hanging indefinitely#51904
jjyao merged 25 commits intoray-project:masterfrom
kevin85421:20250329-devbox1-tmux4-ray

Conversation

@kevin85421
Copy link
Copy Markdown
Member

@kevin85421 kevin85421 commented Apr 2, 2025

Why are these changes needed?

We observe an issue that ray.wait hangs indefinitely in the following case:

  • Driver sends a Ray task (task1) with a streaming generator enabled to actor A, and the task’s sequence number is 0.
  • Actor A receives task1
  • Actor A somehow becomes unavailable from the driver's perspective.
  • Driver resubmits task1 again with the same sequence number (i.e. seq_no=0)
  • Actor A finishes the first task1 and sends a RPC ReportGeneratorItemReturns to the driver.
  • The driver replies with a Status::NotFound to Actor A because the driver has already made another attempt, and the previous attempt is now outdated.
    if (it->second.spec.AttemptNumber() > attempt_number) {
    // Generator task reports can arrive at any time. If the first attempt
    // fails, we may receive a report from the first executor after the
    // second attempt has started. In this case, we should ignore the first
    // attempt.
    execution_signal_callback(
    Status::NotFound("Stale object reports from the previous attempt."), -1);
    return false;
  • Actor A receives the second task1 again. However, the actor will not execute the request because the sequence number is the same.
  • The driver will continue resubmitting task1 with seq_no=0, while the actor repeatedly cancels the tasks.

3 possible solutions

Solution 1: Always increment seq_no during resubmissions

  • Pros:
    • Simplest implementation.
    • Solves a slightly smaller issue than Solution 2, but the implementation is much simpler.
  • Cons:
    • Doesn't promise the execution order of the retryable actor tasks.
  • Examples
    • Case 1
      • The driver submits task A (seq_no=0) to an actor, and B, C, D are in the queue.
      • Actor unavailable.
      • The actor doesn’t receive task A.
      • The network becomes healthy.
      • The driver resubmits task A (seq_no=4). Update client_processed_up_to_.
      • The actor executes B, C, D, A.
    • Case 2
      • The driver submits task A (seq_no=0) to an actor, and B, C, D are in the queue.
      • The actor receives task A.
      • Actor unavailable.
      • The network becomes healthy.
      • The driver resubmits task A (seq_no=4).
      • The actor executes A, B, C, D, A.

Solution 2: Resubmit with the same seq_no first. If this fails, resubmit with a new seq_no

  • Pros:
    • If the actor doesn't receive any tasks, the task execution order remains the same for resubmission.
  • Cons:
    • Doesn't promise the execution order of the actor tasks if the actor has already received some tasks before network issues.
    • The complexity is much higher than Solution 1. There has been a PR tried to fix it, but the PR failed to get merged (Introducing StaleTaskError #46705).
  • Examples
    • Case 1
      • The driver submits task A (seq_no=0) to an actor, and B, C, D are in the queue.
      • Actor unavailable.
      • The actor doesn’t receive task A.
      • The network becomes healthy.
      • The driver resubmits task A (seq_no=0).
      • The actor executes A, B, C, D.
    • Case 2
      • The driver submits task A (seq_no=0) to an actor, and B, C, D are in the queue.
      • The actor receives task A.
      • Actor unavailable.
      • The network becomes healthy.
      • The actor resubmits task A (seq_no=0).
      • The actor cancels the second task A.
      • The actor resubmits task A (seq_no=4).
      • The actor executes A, B, C, D, A.

Solution 3: Actor caches the objects until the driver says that the objects are unnecessary.

  • Pros
    • Promise the execution order
  • Cons
    • Complex
    • It's possible to increase the possibility of an OOM error because actors need to cache the results. This could be solved by adding some kind of backpressure (if there are >N results pending due to network instability, pause generator execution). However, it increases the complexity further.
  • Examples
    • Case 1
      • The driver submits task A (seq_no=0) to an actor, and B, C, D are in the queue.
      • Actor unavailable.
      • The actor doesn’t receive task A.
      • The network becomes healthy.
      • The driver resubmits task A (seq_no=0).
      • The actor executes A, B, C, D.
    • Case 2
      • The driver submits task A (seq_no=0) to an actor, and B, C, D are in the queue.
      • The actor receives task A.
      • Actor unavailable.
      • The network becomes healthy.
      • The driver resubmits task A (seq_no=0).
      • The actor executes A, B, C, D. When the actor receives the second task A, the actor returns the cache result to the driver.

Conclusion

3 solutions don't promise the execution order if take lineage reconstruction into consideration. Choose Solution 1 because it is the simplest.

Related issue number

Checks

  • I've signed off every commit(by using the -s flag, i.e., git commit -s) in this PR.
  • I've run scripts/format.sh to lint the changes in this PR.
  • I've included any doc changes needed for https://docs.ray.io/en/master/.
    • I've added any new APIs to the API Reference. For example, if I added a
      method in Tune, I've added it in doc/source/tune/api/ under the
      corresponding .rst file.
  • I've made sure the tests are passing. Note that there might be a few flaky tests, see the recent failures at https://flakey-tests.ray.io/
  • Testing Strategy
    • Unit tests
    • Release tests
    • This PR is not tested :(

To speed up the reproduction, you need to update the two configs:

// old: 300s, new: 30s
RAY_CONFIG(int64_t, grpc_client_keepalive_time_ms, 30000)
// old: 120s, new: 15s
RAY_CONFIG(int64_t, grpc_client_keepalive_timeout_ms, 15000)

The reproduction uses iptables to block the connections for HandlePushTask RPC between the driver and actor, so the driver can't receive the RPC reply from the actor after the network policies are applied.

The driver thinks the actor is "unavailable" because of gRPC keeplive watchdog timeout. The worst case it should happen after grpc_client_keepalive_time_ms + grpc_client_keepalive_timeout_ms (i.e. 45s in the above case).

RAY_BACKEND_LOG_LEVEL=debug python3 test.py
./network_policy.sh -A

# Wait until actor unavailable. Run the following command until the core-driver log file has some related logs
grep -rn "UNAVAIL"

# Remove network policy
./network_policy.sh -D

# Wait until the first actor task finishes and the io context starts to handle the resubmit task

Without this PR

ray.wait stucks forever.

image

With this PR

The resubmitted task will be executed after the old one finished, and the driver receives 0 ~ 7 from the first task and 8 ~ 19 from the resubmitted one.

image

@kevin85421 kevin85421 added the go add ONLY when ready to merge, run all tests label Apr 2, 2025
@kevin85421
Copy link
Copy Markdown
Member Author

Next steps:

  1. Be more conservative to fail in-flight requests.
  2. Consider shutdown the running task if its output will no longer be consumed
  3. How to add a test? Currently, it requires to setup iptables.

@kevin85421 kevin85421 marked this pull request as ready for review April 2, 2025 09:02
@jjyao
Copy link
Copy Markdown
Contributor

jjyao commented Apr 2, 2025

How to add a test? Currently, it requires to setup iptables.

Check out rpc_chaos.cc

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we still keep this case instead of always updating seq_no?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think both should be fine. If an actor died, the new actor should not reject the tasks with the same seq_no.

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the case of ACTOR_DIED, it would maintain the original order of execution in some cases (if multiple tasks need to get resubmitted)

Very very minor edge case though -- I would be fine with dropping this as well to unify the failure handling.

@kevin85421
Copy link
Copy Markdown
Member Author

Check out rpc_chaos.cc

It doesn't seem to support dynamic configuration.

@jjyao
Copy link
Copy Markdown
Contributor

jjyao commented Apr 2, 2025

It doesn't seem to support dynamic configuration.

What dynamic configuration you need? This is a simple chaos framework written by us so we can enhance it if it's missing features.

@kevin85421
Copy link
Copy Markdown
Member Author

There’s another issue related to task retrying and actor restarts. I may create a separate PR to fix it, since the current PR description already contains too much information.

@kevin85421 kevin85421 changed the title [core] Avoid resubmitted actor tasks from hanging indefinitely [WIP][core] Avoid resubmitted actor tasks from hanging indefinitely Apr 3, 2025
@kevin85421 kevin85421 marked this pull request as draft April 3, 2025 08:28
Signed-off-by: kaihsun <kaihsun@anyscale.com>
Signed-off-by: kaihsun <kaihsun@anyscale.com>
@kevin85421 kevin85421 force-pushed the 20250329-devbox1-tmux4-ray branch from 6e77fa6 to 55607b9 Compare April 4, 2025 04:46
Signed-off-by: kaihsun <kaihsun@anyscale.com>
Signed-off-by: kaihsun <kaihsun@anyscale.com>
Signed-off-by: kaihsun <kaihsun@anyscale.com>
Signed-off-by: kaihsun <kaihsun@anyscale.com>
Signed-off-by: kaihsun <kaihsun@anyscale.com>
Signed-off-by: kaihsun <kaihsun@anyscale.com>
Signed-off-by: kaihsun <kaihsun@anyscale.com>
Signed-off-by: kaihsun <kaihsun@anyscale.com>
Signed-off-by: kaihsun <kaihsun@anyscale.com>
@jjyao jjyao enabled auto-merge (squash) April 8, 2025 22:06
@dragongu
Copy link
Copy Markdown
Contributor

dragongu commented Apr 21, 2025

@kevin85421 @jjyao 'Driver resubmits task1 again with the same sequence number (i.e. seq_no=0)'

Could the resubmission of the driver conflict with actor task failed retry(because the actor restarted) ?

After I used this commit and ran a very simple ray data job(but the worker pod would frequently crash due to being preempt), an error would be throw:
task_manager.cc:1412: Check failed: it->second.GetStatus() == rpc::TaskStatus::PENDING_NODE_ASSIGNMENT , task ID = 6ff6fe559f63b1b8b015cbfb3a695db0935ce25820000000, status = 1

@jjyao
Copy link
Copy Markdown
Contributor

jjyao commented Apr 22, 2025

@dragongu

Could you provide a repro? Which exact commit you were using?

@kevin85421
Copy link
Copy Markdown
Member Author

Could the resubmission of the driver conflict with actor task failed retry(because the actor restarted) ?

what does conflict refer to?

A reproduction would be helpful.

@dragongu
Copy link
Copy Markdown
Contributor

dragongu commented Apr 22, 2025

@dragongu

Could you provide a repro? Which exact commit you were using?

@jjyao Use the master branch

@dragongu
Copy link
Copy Markdown
Contributor

dragongu commented Apr 22, 2025

Could the resubmission of the driver conflict with actor task failed retry(because the actor restarted) ?

what does conflict refer to?

A reproduction would be helpful.

@kevin85421
The scene is like this:

It is a simple ray data job, read_parquet -> map_batch -> map_batch -> write_parquet, with continuous worker terminate (Pods are forcibly occupied) and new worker join, stack information is :

[2025-04-20 16:41:18,448 C 818365 821947] task_manager.cc:1416: Check failed: it->second.GetStatus() == rpc::TaskStatus::PENDING_NODE_ASSIGNMENT , task ID = eccd8f51d8b690fddce51415fb4280436e4d986741000000, status = 11 /usr/local/lib/python3.10/dist-packages/ray/_raylet.so(+0xaf0568) [0x7f9aa0f94568] ray::core::ActorTaskSubmitter::PushActorTask() /usr/local/lib/python3.10/dist-packages/ray/_raylet.so(_ZN3ray4core11TaskManager27MarkTaskWaitingForExecutionERKNS_6TaskIDERKNS_6NodeIDERKNS_8WorkerIDE+0x3b5) [0x7f9aa0faaf25] ray::core::TaskManager::MarkTaskWaitingForExecution() /usr/local/lib/python3.10/dist-packages/ray/_raylet.so(_ZN3ray6RayLogD1Ev+0x111) [0x7f9aa1a6fd81] ray::RayLog::~RayLog() /usr/local/lib/python3.10/dist-packages/ray/_raylet.so(+0x15c7417) [0x7f9aa1a6b417] ray::operator<<() *** StackTrace Information ***

@jjyao
Copy link
Copy Markdown
Contributor

jjyao commented Apr 22, 2025

Could you tell me the exact commit? Seems the log line number doesn't match the master.

@dragongu
Copy link
Copy Markdown
Contributor

Could you tell me the exact commit? Seems the log line number doesn't match the master.

Some irrelevant debug logs

@dragongu
Copy link
Copy Markdown
Contributor

dragongu commented Apr 22, 2025

@jjyao @kevin85421
Hi, I suspect that the check failed might occur in the following scenario:

  1. The actor restarts.
  2. The actor task fails due to the actor being unavailable.
  3. The coreWorker retries the task in a scheduled thread: InternalHeartbeat and
    the task ran successfully and updated its status to 11 (FINISHED).
  4. The actor becomes alive, and TaskManager::MarkTaskWaitingForExecution() is executed, which then triggers an exception.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

community-backlog go add ONLY when ready to merge, run all tests

Projects

None yet

Development

Successfully merging this pull request may close these issues.

7 participants