[core][refactor] move NodeManager::KillWorker to WorkerInterface::KillAsync for better testability#54068
Conversation
6dac598 to
d7132a3
Compare
d7132a3 to
c7532d2
Compare
There was a problem hiding this comment.
Pull Request Overview
This PR refactors worker termination logic by moving NodeManager::KillWorker into the WorkerInterface and updating all consumers accordingly. It implements a new Kill method on Worker, updates NodeManager to call it, refreshes mocks and tests to support the change, and removes the old KillWorker helper.
- Introduces
WorkerInterface::Killand implements it inWorker. - Updates
NodeManagerand tests to use the newKillAPI. - Adjusts GCS client usage in local object manager tests and BUILD dependencies for mocks.
Reviewed Changes
Copilot reviewed 9 out of 9 changed files in this pull request and generated 1 comment.
Show a summary per file
| File | Description |
|---|---|
| src/ray/raylet/worker.h | Added Kill to WorkerInterface |
| src/ray/raylet/worker.cc | Implemented Worker::Kill with graceful and force modes |
| src/ray/raylet/test/util.h | Expanded MockWorker to track Kill invocations |
| src/ray/raylet/test/node_manager_test.cc | Updated NodeManager tests to use mock callbacks rather than real threads/processes |
| src/ray/raylet/test/local_object_manager_test.cc | Switched to MockGcsClient and updated pointer type |
| src/ray/raylet/node_manager.h | Removed old KillWorker and exposed HandleRequestWorkerLease |
| src/ray/raylet/node_manager.cc | Replaced calls to KillWorker with worker->Kill |
| src/mock/ray/raylet/worker.h | Added mock for new Kill method |
| BUILD.bazel | Added :ray_mock to test dependencies |
Comments suppressed due to low confidence (4)
src/ray/raylet/worker.h:49
- Add a doc comment for the new
Killmethod inWorkerInterfaceto describe its behavior, parameters, and expected side effects for better maintainability.
virtual void Kill(instrumented_io_context &io_service, bool force = false) = 0;
src/ray/raylet/node_manager.h:259
- [nitpick] Consider passing
requestby const reference (const rpc::RequestWorkerLeaseRequest&) to avoid an unnecessary copy of the protobuf message.
void HandleRequestWorkerLease(rpc::RequestWorkerLeaseRequest request,
src/ray/raylet/test/local_object_manager_test.cc:28
- Verify that the include path for the mock GCS client is correct—an incorrect path here could cause test compilation failures.
#include "mock/ray/gcs/gcs_client/gcs_client.h"
BUILD.bazel:686
- [nitpick] Ensure that the
ray_mocktarget is defined and described in the BUILD file or a README so that its purpose and dependencies are clear to future maintainers.
":ray_mock",
…ter testability Signed-off-by: Rueian <rueian@anyscale.com> Signed-off-by: Rueian <rueiancsie@gmail.com>
…ter testability Signed-off-by: Rueian <rueian@anyscale.com> Signed-off-by: Rueian <rueiancsie@gmail.com>
c7532d2 to
68864a3
Compare
|
Hi @israbbani, this PR follows your suggestions to make |
src/ray/raylet/worker.h
Outdated
| /// \param force true to kill immediately, false to give time for the worker to | ||
| /// clean up and exit gracefully. | ||
| /// \return Void. | ||
| void Kill(instrumented_io_context &io_service, bool force = false); |
There was a problem hiding this comment.
Document if Kill is idempotent
I think it is currently not because the PID could be reused after the process exits.
There was a problem hiding this comment.
While I think there isn't much we can do to address the PID reuse problem, making this Kill idempotent is simple. All we need is to set and check a flag if the method is invoked or not.
There was a problem hiding this comment.
I have made the Kill idempotent by adding the new killed_ flag, but I am figuring out if we could merge MarkDead with Kill and share their flags; otherwise, their semantics seem to be overlapping a bit. If we can, I will send a new PR for merging them.
There was a problem hiding this comment.
Sounds good, I was going to suggest the same :)
Signed-off-by: Rueian <rueian@anyscale.com>
d6ed584 to
1381cc2
Compare
src/ray/raylet/worker.cc
Outdated
|
|
||
| bool Worker::IsDead() const { return dead_; } | ||
|
|
||
| void Worker::Kill(instrumented_io_context &io_service, bool force) { |
There was a problem hiding this comment.
nit: can we name this KillAsync to make it clear that when the function returns, the worker may still be alive?
src/ray/raylet/worker.cc
Outdated
| if (killed_) { // TODO(rueian): could we just reuse the dead_ flag? | ||
| return; // If the worker is already killed by this Kill method, do nothing. | ||
| } | ||
| killed_ = true; |
There was a problem hiding this comment.
need to use atomic fetch_add here
src/ray/raylet/worker.cc
Outdated
| RayConfig::instance().kill_worker_timeout_milliseconds()); | ||
| retry_timer->expires_from_now(retry_duration); | ||
| retry_timer->async_wait([retry_timer, worker](const boost::system::error_code &error) { | ||
| RAY_LOG(DEBUG) << "Send SIGKILL to worker, pid=" << worker->GetProcess().GetId(); |
There was a problem hiding this comment.
| RAY_LOG(DEBUG) << "Send SIGKILL to worker, pid=" << worker->GetProcess().GetId(); | |
| RAY_LOG(DEBUG) << "Worker with PID=" << worker->GetProcess().GetId() << " did not exit after " << timeout << "ms, force killing with SIGKILL."; |
this should be at least INFO-level IMO, maybe WARNING. if you agree please update it
There was a problem hiding this comment.
except this will mean it's logged every time on windows because there is no graceful kill above on windows... conditional log based on platform?
israbbani
left a comment
There was a problem hiding this comment.
Small nits. Thanks for being so proactive about this. We should make the flag atomic and follow up with merging it with the dead_ flag to provide cleaner semantics like you suggested.
src/ray/raylet/worker.h
Outdated
| /// Kill the worker. This is idempotent, that is, no effect starting from the second | ||
| /// call. \param io_service for scheduling the graceful period timer. \param force true | ||
| /// to kill immediately, false to give time for the worker to clean up and exit | ||
| /// gracefully. \return Void. | ||
| void Kill(instrumented_io_context &io_service, bool force = false); |
There was a problem hiding this comment.
| /// Kill the worker. This is idempotent, that is, no effect starting from the second | |
| /// call. \param io_service for scheduling the graceful period timer. \param force true | |
| /// to kill immediately, false to give time for the worker to clean up and exit | |
| /// gracefully. \return Void. | |
| void Kill(instrumented_io_context &io_service, bool force = false); | |
| /// Kill the worker process. This is idempotent | |
| /// \param io_service for scheduling the graceful period timer. | |
| /// \param force true to kill immediately, false to give time for the worker to clean up and exit gracefully. | |
| /// \return Void. | |
| void Kill(instrumented_io_context &io_service, bool force = false); |
src/ray/raylet/worker.cc
Outdated
|
|
||
| bool Worker::IsDead() const { return dead_; } | ||
|
|
||
| void Worker::Kill(instrumented_io_context &io_service, bool force) { |
src/ray/raylet/worker.cc
Outdated
| if (killed_) { // TODO(rueian): could we just reuse the dead_ flag? | ||
| return; // If the worker is already killed by this Kill method, do nothing. | ||
| } | ||
| killed_ = true; |
src/ray/raylet/worker.h
Outdated
| /// Whether the worker is dead. | ||
| bool dead_; | ||
| /// Whether the worker is killed by the Kill method. | ||
| bool killed_; |
There was a problem hiding this comment.
Should this be made atomic and be named killing_ instead of killed_. Technically, the process isn't killed successfully when this is set to true.
| return; | ||
| } | ||
| #ifdef _WIN32 | ||
| // TODO(mehrdadn): implement graceful process termination mechanism |
There was a problem hiding this comment.
@edoakes is this worth leaving in? Either we create an issue and make open it up for a windows community contribution or we should just remove the TODO.
There was a problem hiding this comment.
I agree -- issue for a windows community contribution would be great!
src/ray/raylet/worker.cc
Outdated
| // If we're just cleaning up a single worker, allow it some time to clean | ||
| // up its state before force killing. The client socket will be closed | ||
| // and the worker struct will be freed after the timeout. |
There was a problem hiding this comment.
| // If we're just cleaning up a single worker, allow it some time to clean | |
| // up its state before force killing. The client socket will be closed | |
| // and the worker struct will be freed after the timeout. | |
| // Attempt to gracefully shutdown the worker before force killing it. |
Signed-off-by: Rueian <rueian@anyscale.com>
| }) { | ||
| RayConfig::instance().initialize(R"({ | ||
| "raylet_liveness_self_check_interval_ms": 100, | ||
| "kill_worker_timeout_milliseconds": 10 |
|
and can you also file a GH issue for the windows graceful process shutdown? |
No problem. Filed here #54374. |
…lAsync for better testability (#54068) Following the suggestion in #53562 (comment), this PR moves `NodeManager::KillWorker` to `WorkerInterface::Kill` so that we can mock the method for testing if it is invoked or not, instead of spawning a real process to see if it is killed or not. As a side effect, this will also eliminate the confusion between `NodeManager::KillWorker` and `NodeManager::DestroyWorker`. --------- Signed-off-by: Rueian <rueian@anyscale.com> Signed-off-by: elliot-barn <elliot.barnwell@anyscale.com>
…ace::KillAsync for better testability (ray-project#54068)" This reverts commit 4d9b323.
## Why are these changes needed? Following [the previous discussion](#54068 (comment)), this PR merges the flags used by `Worker::MarkDead` and `Worker::KillAsync`. Currently, `Worker::MarkDead` will only be called after we ask the worker to shutdown via the `Exit` RPC. After the `Exit` RPC, the worker should shut down by itself. This change essentially prevents sending `SIGTERM` from the node manager to a worker once its `Worker::MarkDead` method has been called, ensuring that we don't interrupt its shutdown process. ## Related issue number <!-- For example: "Closes #1234" --> ## Checks - [x] I've signed off every commit(by using the -s flag, i.e., `git commit -s`) in this PR. - [x] I've run `scripts/format.sh` to lint the changes in this PR. - [ ] I've included any doc changes needed for https://docs.ray.io/en/master/. - [ ] I've added any new APIs to the API Reference. For example, if I added a method in Tune, I've added it in `doc/source/tune/api/` under the corresponding `.rst` file. - [ ] I've made sure the tests are passing. Note that there might be a few flaky tests, see the recent failures at https://flakey-tests.ray.io/ - Testing Strategy - [ ] Unit tests - [ ] Release tests - [ ] This PR is not tested :( Signed-off-by: Rueian <rueian@anyscale.com> Co-authored-by: Sagar Sumit <sagarsumit09@gmail.com>
…oject#54377) ## Why are these changes needed? Following [the previous discussion](ray-project#54068 (comment)), this PR merges the flags used by `Worker::MarkDead` and `Worker::KillAsync`. Currently, `Worker::MarkDead` will only be called after we ask the worker to shutdown via the `Exit` RPC. After the `Exit` RPC, the worker should shut down by itself. This change essentially prevents sending `SIGTERM` from the node manager to a worker once its `Worker::MarkDead` method has been called, ensuring that we don't interrupt its shutdown process. ## Related issue number <!-- For example: "Closes ray-project#1234" --> ## Checks - [x] I've signed off every commit(by using the -s flag, i.e., `git commit -s`) in this PR. - [x] I've run `scripts/format.sh` to lint the changes in this PR. - [ ] I've included any doc changes needed for https://docs.ray.io/en/master/. - [ ] I've added any new APIs to the API Reference. For example, if I added a method in Tune, I've added it in `doc/source/tune/api/` under the corresponding `.rst` file. - [ ] I've made sure the tests are passing. Note that there might be a few flaky tests, see the recent failures at https://flakey-tests.ray.io/ - Testing Strategy - [ ] Unit tests - [ ] Release tests - [ ] This PR is not tested :( Signed-off-by: Rueian <rueian@anyscale.com> Co-authored-by: Sagar Sumit <sagarsumit09@gmail.com> Signed-off-by: alimaazamat <alima.azamat2003@gmail.com>
…lAsync for better testability (ray-project#54068) Following the suggestion in ray-project#53562 (comment), this PR moves `NodeManager::KillWorker` to `WorkerInterface::Kill` so that we can mock the method for testing if it is invoked or not, instead of spawning a real process to see if it is killed or not. As a side effect, this will also eliminate the confusion between `NodeManager::KillWorker` and `NodeManager::DestroyWorker`. --------- Signed-off-by: Rueian <rueian@anyscale.com> Signed-off-by: ChanChan Mao <chanchanmao1130@gmail.com>
…oject#54377) ## Why are these changes needed? Following [the previous discussion](ray-project#54068 (comment)), this PR merges the flags used by `Worker::MarkDead` and `Worker::KillAsync`. Currently, `Worker::MarkDead` will only be called after we ask the worker to shutdown via the `Exit` RPC. After the `Exit` RPC, the worker should shut down by itself. This change essentially prevents sending `SIGTERM` from the node manager to a worker once its `Worker::MarkDead` method has been called, ensuring that we don't interrupt its shutdown process. ## Related issue number <!-- For example: "Closes ray-project#1234" --> ## Checks - [x] I've signed off every commit(by using the -s flag, i.e., `git commit -s`) in this PR. - [x] I've run `scripts/format.sh` to lint the changes in this PR. - [ ] I've included any doc changes needed for https://docs.ray.io/en/master/. - [ ] I've added any new APIs to the API Reference. For example, if I added a method in Tune, I've added it in `doc/source/tune/api/` under the corresponding `.rst` file. - [ ] I've made sure the tests are passing. Note that there might be a few flaky tests, see the recent failures at https://flakey-tests.ray.io/ - Testing Strategy - [ ] Unit tests - [ ] Release tests - [ ] This PR is not tested :( Signed-off-by: Rueian <rueian@anyscale.com> Co-authored-by: Sagar Sumit <sagarsumit09@gmail.com> Signed-off-by: Krishna Kalyan <krishnakalyan3@gmail.com>
…oject#54377) ## Why are these changes needed? Following [the previous discussion](ray-project#54068 (comment)), this PR merges the flags used by `Worker::MarkDead` and `Worker::KillAsync`. Currently, `Worker::MarkDead` will only be called after we ask the worker to shutdown via the `Exit` RPC. After the `Exit` RPC, the worker should shut down by itself. This change essentially prevents sending `SIGTERM` from the node manager to a worker once its `Worker::MarkDead` method has been called, ensuring that we don't interrupt its shutdown process. ## Related issue number <!-- For example: "Closes ray-project#1234" --> ## Checks - [x] I've signed off every commit(by using the -s flag, i.e., `git commit -s`) in this PR. - [x] I've run `scripts/format.sh` to lint the changes in this PR. - [ ] I've included any doc changes needed for https://docs.ray.io/en/master/. - [ ] I've added any new APIs to the API Reference. For example, if I added a method in Tune, I've added it in `doc/source/tune/api/` under the corresponding `.rst` file. - [ ] I've made sure the tests are passing. Note that there might be a few flaky tests, see the recent failures at https://flakey-tests.ray.io/ - Testing Strategy - [ ] Unit tests - [ ] Release tests - [ ] This PR is not tested :( Signed-off-by: Rueian <rueian@anyscale.com> Co-authored-by: Sagar Sumit <sagarsumit09@gmail.com> Signed-off-by: jugalshah291 <shah.jugal291@gmail.com>
…lAsync for better testability (ray-project#54068) Following the suggestion in ray-project#53562 (comment), this PR moves `NodeManager::KillWorker` to `WorkerInterface::Kill` so that we can mock the method for testing if it is invoked or not, instead of spawning a real process to see if it is killed or not. As a side effect, this will also eliminate the confusion between `NodeManager::KillWorker` and `NodeManager::DestroyWorker`. --------- Signed-off-by: Rueian <rueian@anyscale.com> Signed-off-by: Douglas Strodtman <douglas@anyscale.com>
…oject#54377) ## Why are these changes needed? Following [the previous discussion](ray-project#54068 (comment)), this PR merges the flags used by `Worker::MarkDead` and `Worker::KillAsync`. Currently, `Worker::MarkDead` will only be called after we ask the worker to shutdown via the `Exit` RPC. After the `Exit` RPC, the worker should shut down by itself. This change essentially prevents sending `SIGTERM` from the node manager to a worker once its `Worker::MarkDead` method has been called, ensuring that we don't interrupt its shutdown process. ## Related issue number <!-- For example: "Closes ray-project#1234" --> ## Checks - [x] I've signed off every commit(by using the -s flag, i.e., `git commit -s`) in this PR. - [x] I've run `scripts/format.sh` to lint the changes in this PR. - [ ] I've included any doc changes needed for https://docs.ray.io/en/master/. - [ ] I've added any new APIs to the API Reference. For example, if I added a method in Tune, I've added it in `doc/source/tune/api/` under the corresponding `.rst` file. - [ ] I've made sure the tests are passing. Note that there might be a few flaky tests, see the recent failures at https://flakey-tests.ray.io/ - Testing Strategy - [ ] Unit tests - [ ] Release tests - [ ] This PR is not tested :( Signed-off-by: Rueian <rueian@anyscale.com> Co-authored-by: Sagar Sumit <sagarsumit09@gmail.com> Signed-off-by: Douglas Strodtman <douglas@anyscale.com>
Why are these changes needed?
Following the suggestion in #53562 (comment), this PR moves
NodeManager::KillWorkertoWorkerInterface::Killso that we can mock the method for testing if it is invoked or not, instead of spawning a real process to see if it is killed or not.As a side effect, this will also eliminate the confusion between
NodeManager::KillWorkerandNodeManager::DestroyWorker.Related issue number
Checks
git commit -s) in this PR.scripts/format.shto lint the changes in this PR.method in Tune, I've added it in
doc/source/tune/api/under thecorresponding
.rstfile.