Skip to content

[pull] master from ray-project:master#11

Merged
pull[bot] merged 9 commits intoFuture-Outlier:masterfrom
ray-project:master
Jun 11, 2025
Merged

[pull] master from ray-project:master#11
pull[bot] merged 9 commits intoFuture-Outlier:masterfrom
ray-project:master

Conversation

@pull
Copy link
Copy Markdown

@pull pull bot commented Jun 11, 2025

See Commits and Changes for more details.


Created by pull[bot] (v2.0.0-alpha.1)

Can you help keep this open source service alive? 💖 Please sponsor : )

Myasuka and others added 9 commits June 11, 2025 10:20
Current ray's doc has a description of [how to enable autoscaler
V2](https://docs.ray.io/en/latest/cluster/kubernetes/user-guides/configuring-autoscaling.html#autoscaler-v2-with-kuberay).
However, the ray-cluster.autoscaler-v2.yaml example to the
https://github.com/ray-project/kuberay/blob/master/ray-operator/config/samples/ray-cluster.autoscaler-v2.yaml
which does not have related configuration of RAY_enable_autoscaler_v2.
This is because that
[commit-a9c48c](ray-project/kuberay@a9c48c1)
introduced a new feature, which has not been released yet.

To reduce such problem, we should make the ray's doc bind to a specific
released kuberay version.

Closes #53655

Signed-off-by: Yun Tang <myasuka@live.com>
label-based selection is being added as a feature. tracker here:
#51564

As part of the feature, we should allow users to observe labels of nodes
and selection rules of tasks and actors.
This PR adds these fields to:
- state api CLI output
- state api API responses
- Ray Dashboard UI

Nodes:
![Screenshot 2025-05-29 at 2 26
30 PM](https://github.com/user-attachments/assets/1dcd5faf-bdcf-440b-b03f-0eeeb23a46bb)
![Screenshot 2025-05-29 at 2 26
26 PM](https://github.com/user-attachments/assets/8b37c283-342f-43e6-8d87-19e56e514817)

Tasks:
![Screenshot 2025-05-29 at 2 26
49 PM](https://github.com/user-attachments/assets/15c1fc6f-e540-4030-99ec-ae9124df83eb)
![Screenshot 2025-05-29 at 2 26
53 PM](https://github.com/user-attachments/assets/e078981f-cf46-47a1-b49a-4e37ffa63502)

Actors:
![Screenshot 2025-05-29 at 3 06
15 PM](https://github.com/user-attachments/assets/7de5c507-c44d-412f-828a-6b30a7466e04)
![Screenshot 2025-05-29 at 3 06
17 PM](https://github.com/user-attachments/assets/954b36e0-9042-41e6-a4a9-d88803559975)

---------

Signed-off-by: Alan Guo <aguo@anyscale.com>
We just had a production outage at Roblox due to a large number of
headless services created for Ray overwhelming the service mesh sidecars
and causing ingress gateways to fail.

Another thing we observed is that mTLS slows down some jobs
significantly. So we ended up using istio with interception mode "none"
to only proxy the 8265 port to expose the head node securely, and leave
the head - worker grpc connections unencrypted. But I wasn't sure it's a
common enough issue to mention in the docs.

<!-- Thank you for your contribution! Please review
https://github.com/ray-project/ray/blob/master/CONTRIBUTING.rst before
opening a pull request. -->

<!-- Please add a reviewer to the assignee section when you create a PR.
If you don't have the access to it, we will shortly find a reviewer and
assign them to your PR. -->

---------

Signed-off-by: Steve Han <36038610+han-steve@users.noreply.github.com>
Co-authored-by: angelinalg <122562471+angelinalg@users.noreply.github.com>
Co-authored-by: Kai-Hsun Chen <kaihsun@anyscale.com>
Co-authored-by: Edward Oakes <ed.nmi.oakes@gmail.com>
…sma (#53713)

Currently actor restarts won't because lineage ref counting doesn't work
for actors with restarts. See description here
#51653 (comment).

Minimal repro
```
cluster = ray_start_cluster
cluster.add_node(num_cpus=0)  # head
ray.init(address=cluster.address)
worker1 = cluster.add_node(num_cpus=1)

@ray.remote(num_cpus=1, max_restarts=1)
class Actor:
    def __init__(self, config):
        self.config = config

    def ping(self):
        return self.config

# Arg is >100kb so will go in the object store
actor = Actor.remote(np.zeros(100 * 1024 * 1024, dtype=np.uint8))
ray.get(actor.ping.remote())

worker2 = cluster.add_node(num_cpus=1)
cluster.remove_node(worker1, allow_graceful=True)

# This line will break
ray.get(actor.ping.remote())
```

---------

Signed-off-by: dayshah <dhyey2019@gmail.com>
Signed-off-by: Edward Oakes <ed.nmi.oakes@gmail.com>
Co-authored-by: Edward Oakes <ed.nmi.oakes@gmail.com>
Not used outside of this file.

---------

Signed-off-by: Edward Oakes <ed.nmi.oakes@gmail.com>
Most of our io_contexts are only run on a single thread. asio has a
concurrency hint option to improve performance in this case, so I'm
adding an optional parameter to the constructor if the io context is
guaranteed to run on a single thread. Also serves as a source of truth
for whether an io_context is only running from a single thread or
multiple threads.

This is for sure a premature optimization, but shouldn't hurt either
way.

What the boost io_context documentation says about the `1` option (we're
on 1.81).
https://www.boost.org/doc/libs/1_81_0/doc/html/boost_asio/overview/core/concurrency_hint.html

> The implementation assumes that the io_context will be run from a
single thread, and applies several optimisations based on this
assumption.
For example, when a handler is posted from within another handler, the
new handler is added to a fast thread-local queue (with the consequence
that the new handler is held back until the currently executing handler
finishes).
The io_context still provides full thread safety, and distinct I/O
objects may be used from any thread.

---------

Signed-off-by: dayshah <dhyey2019@gmail.com>
Used by Train and Tune.

Only `s3` type was used, so refactored.

---------

Signed-off-by: Edward Oakes <ed.nmi.oakes@gmail.com>
The test is failing on windows because of httpx client timeout.
Previously with the `requests` library, there was no timeout set.
Removing this timeout for httpx passes the test on windows.

---------

Signed-off-by: akyang-anyscale <alexyang@anyscale.com>
…53725)

Right now, when users call `ray.train.report` with a gpu tensor in their
ray train train_function, the train controller fails to deserialize the
gpu tensor, causing the training run to hang. With this change, the
train workers running the train_function preemptively raise a
ValueError, allowing the train run to terminate properly.

---------

Signed-off-by: Timothy Seah <tseah@anyscale.com>
Co-authored-by: Timothy Seah <tseah@anyscale.com>
@pull pull bot added the ⤵️ pull label Jun 11, 2025
@pull pull bot merged commit 0c84a2c into Future-Outlier:master Jun 11, 2025
pull bot pushed a commit that referenced this pull request Nov 18, 2025
…BRT: "corrupted size vs. prev_size") (ray-project#58660)

## Summary

This PR fixes a heap corruption bug that causes the driver to crash with
SIGABRT. The issue is caused by a use-after-free when the `RayletClient`
object is destroyed while an asynchronous RPC callback is still pending.

## Problem Description

### Scenario

A Ray Data job (Ray 2.50.0) with pipeline `read_parquet -> filter ->
map_batches -> write` running for 4+ hours, where workers use elastic
resources with low job priority causing frequent worker deaths due to
pod preemption, crashes the driver with SIGABRT:
```
corrupted size vs. prev_size
*** SIGABRT received at time=1761916578 on cpu 30 ***
PC: @ 0x7f073569d9fc (unknown) pthread_kill
Aborted (core dumped)
```



### Trigger Conditions

After reproducing with an ASan image, Asan reveals the actual
use-after-free at:
```
 #0 0x7ff282967361 in std::__atomic_base<long>::fetch_sub(long, std::memory_order) /usr/include/c++/11/bits/atomic_base.h:628
    #1 0x7ff282967361 in std::__atomic_base<long>::operator--(int) /usr/include/c++/11/bits/atomic_base.h:377
    #2 0x7ff282967361 in operator() src/ray/raylet_rpc_client/raylet_client.cc:338
    #3 0x7ff282967361 in __invoke_impl<void, ray::rpc::RayletClient::PinObjectIDs(const ray::rpc::Address&, const std::vector<ray::ObjectID>&, const ray::ObjectID&, ray::rpc::ClientCallback<ray::rpc::PinObjectIDsReply>&)::<lambda(ray::Status, ray::rpc::PinObjectIDsReply&&)>&, const ray::Status&, ray::rpc::PinObjectIDsReply> /usr/include/c++/11/bits/invoke.h:61
    #4 0x7ff282967361 in __invoke_r<void, ray::rpc::RayletClient::PinObjectIDs(const ray::rpc::Address&, const std::vector<ray::ObjectID>&, const ray::ObjectID&, ray::rpc::ClientCallback<ray::rpc::PinObjectIDsReply>&)::<lambda(ray::Status, ray::rpc::PinObjectIDsReply&&)>&, const ray::Status&, ray::rpc::PinObjectIDsReply> /usr/include/c++/11/bits/invoke.h:111
    #5 0x7ff282967361 in _M_invoke /usr/include/c++/11/bits/std_function.h:290
    #6 0x7ff2829fbadf in std::function<void (ray::Status const&, ray::rpc::PinObjectIDsReply&&)>::operator()(ray::Status const&, ray::rpc::PinObjectIDsReply&&) const /usr/include/c++/11/bits/std_function.h:590
    #7 0x7ff2829fbadf in ray::rpc::RetryableGrpcClient::RetryableGrpcRequest::Create<ray::rpc::NodeManagerService, ray::rpc::PinObjectIDsRequest, ray::rpc::PinObjectIDsReply>(std::weak_ptr<ray::rpc::RetryableGrpcClient>, std::unique_ptr<grpc::ClientAsyncResponseReader<ray::rpc::PinObjectIDsReply>, std::default_delete<grpc::ClientAsyncResponseReader<ray::rpc::PinObjectIDsReply> > > (ray::rpc::NodeManagerService::Stub::*)(grpc::ClientContext*, ray::rpc::PinObjectIDsRequest const&, grpc::CompletionQueue*), std::shared_ptr<ray::rpc::GrpcClient<ray::rpc::NodeManagerService> >, std::__cxx11::basic_string<char, std::char_traits<char>, std::allocator<char> >, ray::rpc::PinObjectIDsRequest, std::function<void (ray::Status const&, ray::rpc::PinObjectIDsReply&&)>, long)::{lambda(ray::Status const&)#2}::operator()(ray::Status const&) const bazel-out/k8-dbg/bin/src/ray/rpc/_virtual_includes/retryable_grpc_client/ray/rpc/retryable_grpc_client.h:293
    #8 0x7ff2829fbadf in void std::__invoke_impl<void, ray::rpc::RetryableGrpcClient::RetryableGrpcRequest::Create<ray::rpc::NodeManagerService, ray::rpc::PinObjectIDsRequest, ray::rpc::PinObjectIDsReply>(std::weak_ptr<ray::rpc::RetryableGrpcClient>, std::unique_ptr<grpc::ClientAsyncResponseReader<ray::rpc::PinObjectIDsReply>, std::default_delete<grpc::ClientAsyncResponseReader<ray::rpc::PinObjectIDsReply> > > (ray::rpc::NodeManagerService::Stub::*)(grpc::ClientContext*, ray::rpc::PinObjectIDsRequest const&, grpc::CompletionQueue*), std::shared_ptr<ray::rpc::GrpcClient<ray::rpc::NodeManagerService> >, std::__cxx11::basic_string<char, std::char_traits<char>, std::allocator<char> >, ray::rpc::PinObjectIDsRequest, std::function<void (ray::Status const&, ray::rpc::PinObjectIDsReply&&)>, long)::{lambda(ray::Status const&)#2}&, ray::Status>(std::__invoke_other, ray::rpc::RetryableGrpcClient::RetryableGrpcRequest::Create<ray::rpc::NodeManagerService, ray::rpc::PinObjectIDsRequest, ray::rpc::PinObjectIDsReply>(std::weak_ptr<ray::rpc::RetryableGrpcClient>, std::unique_ptr<grpc::ClientAsyncResponseReader<ray::rpc::PinObjectIDsReply>, std::default_delete<grpc::ClientAsyncResponseReader<ray::rpc::PinObjectIDsReply> > > (ray::rpc::NodeManagerService::Stub::*)(grpc::ClientContext*, ray::rpc::PinObjectIDsRequest const&, grpc::CompletionQueue*), std::shared_ptr<ray::rpc::GrpcClient<ray::rpc::NodeManagerService> >, std::__cxx11::basic_string<char, std::char_traits<char>, std::allocator<char> >, ray::rpc::PinObjectIDsRequest, std::function<void (ray::Status const&, ray::rpc::PinObjectIDsReply&&)>, long)::{lambda(ray::Status const&)#2}&, ray::Status&&) /usr/include/c++/11/bits/invoke.h:61
    #9 0x7ff2829fbadf in std::enable_if<is_invocable_r_v<void, ray::rpc::RetryableGrpcClient::RetryableGrpcRequest::Create<ray::rpc::NodeManagerService, ray::rpc::PinObjectIDsRequest, ray::rpc::PinObjectIDsReply>(std::weak_ptr<ray::rpc::RetryableGrpcClient>, std::unique_ptr<grpc::ClientAsyncResponseReader<ray::rpc::PinObjectIDsReply>, std::default_delete<grpc::ClientAsyncResponseReader<ray::rpc::PinObjectIDsReply> > > (ray::rpc::NodeManagerService::Stub::*)(grpc::ClientContext*, ray::rpc::PinObjectIDsRequest const&, grpc::CompletionQueue*), std::shared_ptr<ray::rpc::GrpcClient<ray::rpc::NodeManagerService> >, std::__cxx11::basic_string<char, std::char_traits<char>, std::allocator<char> >, ray::rpc::PinObjectIDsRequest, std::function<void (ray::Status const&, ray::rpc::PinObjectIDsReply&&)>, long)::{lambda(ray::Status const&)#2}&, ray::Status>, void>::type std::__invoke_r<void, ray::rpc::RetryableGrpcClient::RetryableGrpcRequest::Create<ray::rpc::NodeManagerService, ray::rpc::PinObjectIDsRequest, ray::rpc::PinObjectIDsReply>(std::weak_ptr<ray::rpc::RetryableGrpcClient>, std::unique_ptr<grpc::ClientAsyncResponseReader<ray::rpc::PinObjectIDsReply>, std::default_delete<grpc::ClientAsyncResponseReader<ray::rpc::PinObjectIDsReply> > > (ray::rpc::NodeManagerService::Stub::*)(grpc::ClientContext*, ray::rpc::PinObjectIDsRequest const&, grpc::CompletionQueue*), std::shared_ptr<ray::rpc::GrpcClient<ray::rpc::NodeManagerService> >, std::__cxx11::basic_string<char, std::char_traits<char>, std::allocator<char> >, ray::rpc::PinObjectIDsRequest, std::function<void (ray::Status const&, ray::rpc::PinObjectIDsReply&&)>, long)::{lambda(ray::Status const&)#2}&, ray::Status>(ray::rpc::RetryableGrpcClient::RetryableGrpcRequest::Create<ray::rpc::NodeManagerService, ray::rpc::PinObjectIDsRequest, ray::rpc::PinObjectIDsReply>(std::weak_ptr<ray::rpc::RetryableGrpcClient>, std::unique_ptr<grpc::ClientAsyncResponseReader<ray::rpc::PinObjectIDsReply>, std::default_delete<grpc::ClientAsyncResponseReader<ray::rpc::PinObjectIDsReply> > > (ray::rpc::NodeManagerService::Stub::*)(grpc::ClientContext*, ray::rpc::PinObjectIDsRequest const&, grpc::CompletionQueue*), std::shared_ptr<ray::rpc::GrpcClient<ray::rpc::NodeManagerService> >, std::__cxx11::basic_string<char, std::char_traits<char>, std::allocator<char> >, ray::rpc::PinObjectIDsRequest, std::function<void (ray::Status const&, ray::rpc::PinObjectIDsReply&&)>, long)::{lambda(ray::Status const&)#2}&, ray::Status&&) /usr/include/c++/11/bits/invoke.h:111
    #10 0x7ff2829fbadf in std::_Function_handler<void (ray::Status), ray::rpc::RetryableGrpcClient::RetryableGrpcRequest::Create<ray::rpc::NodeManagerService, ray::rpc::PinObjectIDsRequest, ray::rpc::PinObjectIDsReply>(std::weak_ptr<ray::rpc::RetryableGrpcClient>, std::unique_ptr<grpc::ClientAsyncResponseReader<ray::rpc::PinObjectIDsReply>, std::default_delete<grpc::ClientAsyncResponseReader<ray::rpc::PinObjectIDsReply> > > (ray::rpc::NodeManagerService::Stub::*)(grpc::ClientContext*, ray::rpc::PinObjectIDsRequest const&, grpc::CompletionQueue*), std::shared_ptr<ray::rpc::GrpcClient<ray::rpc::NodeManagerService> >, std::__cxx11::basic_string<char, std::char_traits<char>, std::allocator<char> >, ray::rpc::PinObjectIDsRequest, std::function<void (ray::Status const&, ray::rpc::PinObjectIDsReply&&)>, long)::{lambda(ray::Status const&)#2}>::_M_invoke(std::_Any_data const&, ray::Status&&) /usr/include/c++/11/bits/std_function.h:290
    #11 0x7ff2834657e9 in std::function<void (ray::Status)>::operator()(ray::Status) const /usr/include/c++/11/bits/std_function.h:590
    #12 0x7ff2834657e9 in ray::rpc::RetryableGrpcClient::RetryableGrpcRequest::Fail(ray::Status const&) bazel-out/k8-dbg/bin/src/ray/rpc/_virtual_includes/retryable_grpc_client/ray/rpc/retryable_grpc_client.h:109
    #13 0x7ff2834657e9 in operator() src/ray/rpc/retryable_grpc_client.cc:30
    #14 0x7ff2834657e9 in __invoke_impl<void, ray::rpc::RetryableGrpcClient::~RetryableGrpcClient()::<lambda()>&> /usr/include/c++/11/bits/invoke.h:61
    #15 0x7ff2834657e9 in __invoke_r<void, ray::rpc::RetryableGrpcClient::~RetryableGrpcClient()::<lambda()>&> /usr/include/c++/11/bits/invoke.h:111
    #16 0x7ff2834657e9 in _M_invoke /usr/include/c++/11/bits/std_function.h:290
2025-11-14 16:15:05,608	INFO streaming_executor_state.py:511 -- Running activate tasks is {'MapBatches(QwenInfer)': ['MapBatches(QwenInfer)-79153', 'MapBatches(QwenInfer)-80170', 'MapBatches(QwenInfer)-80225', 'MapBatches(QwenInfer)-80299', 'MapBatches(QwenInfer)-82624'], 'MapBatches(drop_columns)->Write': ['MapBatches(drop_columns)->Write-25244', 'MapBatches(drop_columns)->Write-34438', 'MapBatches(drop_columns)->Write-34439', 'MapBatches(drop_columns)->Write-34440', 'MapBatches(drop_columns)->Write-34441']}
    #17 0x7ff2834e2407 in std::function<void ()>::operator()() const /usr/include/c++/11/bits/std_function.h:590
    #18 0x7ff2834e2407 in EventTracker::RecordExecution(std::function<void ()> const&, std::shared_ptr<StatsHandle>) src/ray/common/event_stats.cc:112
    #19 0x7ff2834bea54 in operator() src/ray/common/asio/instrumented_io_context.cc:110
    #20 0x7ff2834bea54 in __invoke_impl<void, instrumented_io_context::post(std::function<void()>, std::string, int64_t)::<lambda()>&> /usr/include/c++/11/bits/invoke.h:61
    #21 0x7ff2834bea54 in __invoke_r<void, instrumented_io_context::post(std::function<void()>, std::string, int64_t)::<lambda()>&> /usr/include/c++/11/bits/invoke.h:111
    #22 0x7ff2834bea54 in _M_invoke /usr/include/c++/11/bits/std_function.h:290
    #23 0x7ff28242fb5b in std::function<void ()>::operator()() const /usr/include/c++/11/bits/std_function.h:590
    #24 0x7ff28242fb5b in boost::asio::detail::binder0<std::function<void ()> >::operator()() external/boost/boost/asio/detail/bind_handler.hpp:60
    #25 0x7ff28242fb5b in void boost::asio::asio_handler_invoke<boost::asio::detail::binder0<std::function<void ()> > >(boost::asio::detail::binder0<std::function<void ()> >&, ...) external/boost/boost/asio/handler_invoke_hook.hpp:88
    #26 0x7ff28242fb5b in void boost_asio_handler_invoke_helpers::invoke<boost::asio::detail::binder0<std::function<void ()> >, std::function<void ()> >(boost::asio::detail::binder0<std::function<void ()> >&, std::function<void ()>&) external/boost/boost/asio/detail/handler_invoke_helpers.hpp:54
    #27 0x7ff28242fb5b in void boost::asio::detail::asio_handler_invoke<boost::asio::detail::binder0<std::function<void ()> >, std::function<void ()> >(boost::asio::detail::binder0<std::function<void ()> >&, boost::asio::detail::binder0<std::function<void ()> >*) external/boost/boost/asio/detail/bind_handler.hpp:111
    #28 0x7ff28242fb5b in void boost_asio_handler_invoke_helpers::invoke<boost::asio::detail::binder0<std::function<void ()> >, boost::asio::detail::binder0<std::function<void ()> > >(boost::asio::detail::binder0<std::function<void ()> >&, boost::asio::detail::binder0<std::function<void ()> >&) external/boost/boost/asio/detail/handler_invoke_helpers.hpp:54
    #29 0x7ff28242fb5b in boost::asio::detail::executor_op<boost::asio::detail::binder0<std::function<void ()> >, std::allocator<void>, boost::asio::detail::scheduler_operation>::do_complete(void*, boost::asio::detail::scheduler_operation*, boost::system::error_code const&, unsigned long) external/boost/boost/asio/detail/executor_op.hpp:70
    #30 0x7ff2838607d6 in boost::asio::detail::scheduler_operation::complete(void*, boost::system::error_code const&, unsigned long) external/boost/boost/asio/detail/scheduler_operation.hpp:40
    #31 0x7ff2838607d6 in boost::asio::detail::scheduler::do_run_one(boost::asio::detail::conditionally_enabled_mutex::scoped_lock&, boost::asio::detail::scheduler_thread_info&, boost::system::error_code const&) external/boost/boost/asio/detail/impl/scheduler.ipp:492
    #32 0x7ff283892d35 in boost::asio::detail::scheduler::run(boost::system::error_code&) external/boost/boost/asio/detail/impl/scheduler.ipp:210
    #33 0x7ff2838981e0 in boost::asio::io_context::run() external/boost/boost/asio/impl/io_context.ipp:63
2025-11-14 16:15:05,742	INFO streaming_executor_state.py:511 -- Running activate tasks is {'MapBatches(QwenInfer)': ['MapBatches(QwenInfer)-79153', 'MapBatches(QwenInfer)-80170', 'MapBatches(QwenInfer)-80225', 'MapBatches(QwenInfer)-80299', 'MapBatches(QwenInfer)-82624'], 'MapBatches(drop_columns)->Write': ['MapBatches(drop_columns)->Write-25244', 'MapBatches(drop_columns)->Write-34438', 'MapBatches(drop_columns)->Write-34439', 'MapBatches(drop_columns)->Write-34440', 'MapBatches(drop_columns)->Write-34441']}
    #34 0x7ff281e9d0aa in operator() src/ray/core_worker/core_worker_process.cc:193
    #35 0x7ff281e9d247 in run external/boost/boost/thread/detail/thread.hpp:120
    #36 0x7ff282503c47 in thread_proxy external/boost/libs/thread/src/pthread/thread.cpp:179
    #37 0x7ff28b013ac2 in start_thread nptl/pthread_create.c:442
    #38 0x7ff28b0a58bf  (/lib/x86_64-linux-gnu/libc.so.6+0x1268bf)

0x50c003fd3d30 is located 112 bytes inside of 120-byte region [0x50c003fd3cc0,0x50c003fd3d38)
freed by thread T68 here:
2025-11-14 16:15:05,876	INFO streaming_executor_state.py:511 -- Running activate tasks is {'MapBatches(QwenInfer)': ['MapBatches(QwenInfer)-79153', 'MapBatches(QwenInfer)-80170', 'MapBatches(QwenInfer)-80225', 'MapBatches(QwenInfer)-80299', 'MapBatches(QwenInfer)-82624'], 'MapBatches(drop_columns)->Write': ['MapBatches(drop_columns)->Write-25244', 'MapBatches(drop_columns)->Write-34438', 'MapBatches(drop_columns)->Write-34439', 'MapBatches(drop_columns)->Write-34440', 'MapBatches(drop_columns)->Write-34441']}
    #0 0x7ff28b39924f in operator delete(void*, unsigned long) ../../../../src/libsanitizer/asan/asan_new_delete.cpp:172
    #1 0x7ff281eceb5f in __gnu_cxx::new_allocator<std::_Sp_counted_ptr_inplace<ray::rpc::RayletClient, std::allocator<ray::rpc::RayletClient>, (__gnu_cxx::_Lock_policy)2> >::deallocate(std::_Sp_counted_ptr_inplace<ray::rpc::RayletClient, std::allocator<ray::rpc::RayletClient>, (__gnu_cxx::_Lock_policy)2>*, unsigned long) /usr/include/c++/11/ext/new_allocator.h:145
    #2 0x7ff281eceb5f in std::allocator_traits<std::allocator<std::_Sp_counted_ptr_inplace<ray::rpc::RayletClient, std::allocator<ray::rpc::RayletClient>, (__gnu_cxx::_Lock_policy)2> > >::deallocate(std::allocator<std::_Sp_counted_ptr_inplace<ray::rpc::RayletClient, std::allocator<ray::rpc::RayletClient>, (__gnu_cxx::_Lock_policy)2> >&, std::_Sp_counted_ptr_inplace<ray::rpc::RayletClient, std::allocator<ray::rpc::RayletClient>, (__gnu_cxx::_Lock_policy)2>*, unsigned long) /usr/include/c++/11/bits/alloc_traits.h:496
    #3 0x7ff281eceb5f in std::__allocated_ptr<std::allocator<std::_Sp_counted_ptr_inplace<ray::rpc::RayletClient, std::allocator<ray::rpc::RayletClient>, (__gnu_cxx::_Lock_policy)2> > >::~__allocated_ptr() /usr/include/c++/11/bits/allocated_ptr.h:74
    #4 0x7ff281eceb5f in std::_Sp_counted_ptr_inplace<ray::rpc::RayletClient, std::allocator<ray::rpc::RayletClient>, (__gnu_cxx::_Lock_policy)2>::_M_destroy() /usr/include/c++/11/bits/shared_ptr_base.h:538
    #5 0x7ff282a73f0a in std::_Sp_counted_base<(__gnu_cxx::_Lock_policy)2>::_M_release() /usr/include/c++/11/bits/shared_ptr_base.h:184
    #6 0x7ff282a73f0a in std::__shared_count<(__gnu_cxx::_Lock_policy)2>::~__shared_count() /usr/include/c++/11/bits/shared_ptr_base.h:705
    #7 0x7ff282a73f0a in std::__shared_ptr<ray::RayletClientInterface, (__gnu_cxx::_Lock_policy)2>::~__shared_ptr() /usr/include/c++/11/bits/shared_ptr_base.h:1154
    #8 0x7ff282a73f0a in std::shared_ptr<ray::RayletClientInterface>::~shared_ptr() /usr/include/c++/11/bits/shared_ptr.h:122
    #9 0x7ff282a73f0a in std::pair<ray::NodeID const, std::shared_ptr<ray::RayletClientInterface> >::~pair() /usr/include/c++/11/bits/stl_pair.h:211
    #10 0x7ff282a73f0a in void __gnu_cxx::new_allocator<std::pair<ray::NodeID const, std::shared_ptr<ray::RayletClientInterface> > >::destroy<std::pair<ray::NodeID const, std::shared_ptr<ray::RayletClientInterface> > >(std::pair<ray::NodeID const, std::shared_ptr<ray::RayletClientInterface> >*) /usr/include/c++/11/ext/new_allocator.h:168
    #11 0x7ff282a73f0a in void std::allocator_traits<std::allocator<std::pair<ray::NodeID const, std::shared_ptr<ray::RayletClientInterface> > > >::destroy<std::pair<ray::NodeID const, std::shared_ptr<ray::RayletClientInterface> > >(std::allocator<std::pair<ray::NodeID const, std::shared_ptr<ray::RayletClientInterface> > >&, std::pair<ray::NodeID const, std::shared_ptr<ray::RayletClientInterface> >*) /usr/include/c++/11/bits/alloc_traits.h:535
    #12 0x7ff282a73f0a in void absl::lts_20230802::container_internal::map_slot_policy<ray::NodeID, std::shared_ptr<ray::RayletClientInterface> >::destroy<std::allocator<std::pair<ray::NodeID const, std::shared_ptr<ray::RayletClientInterface> > > >(std::allocator<std::pair<ray::NodeID const, std::shared_ptr<ray::RayletClientInterface> > >*, absl::lts_20230802::container_internal::map_slot_type<ray::NodeID, std::shared_ptr<ray::RayletClientInterface> >*) external/com_google_absl/absl/container/internal/container_memory.h:421
    #13 0x7ff282a73f0a in void absl::lts_20230802::container_internal::FlatHashMapPolicy<ray::NodeID, std::shared_ptr<ray::RayletClientInterface> >::destroy<std::allocator<std::pair<ray::NodeID const, std::shared_ptr<ray::RayletClientInterface> > > >(std::allocator<std::pair<ray::NodeID const, std::shared_ptr<ray::RayletClientInterface> > >*, absl::lts_20230802::container_internal::map_slot_type<ray::NodeID, std::shared_ptr<ray::RayletClientInterface> >*) external/com_google_absl/absl/container/flat_hash_map.h:578
    #14 0x7ff282a73f0a in void absl::lts_20230802::container_internal::common_policy_traits<absl::lts_20230802::container_internal::FlatHashMapPolicy<ray::NodeID, std::shared_ptr<ray::RayletClientInterface> >, void>::destroy<std::allocator<std::pair<ray::NodeID const, std::shared_ptr<ray::RayletClientInterface> > > >(std::allocator<std::pair<ray::NodeID const, std::shared_ptr<ray::RayletClientInterface> > >*, absl::lts_20230802::container_internal::map_slot_type<ray::NodeID, std::shared_ptr<ray::RayletClientInterface> >*) external/com_google_absl/absl/container/internal/common_policy_traits.h:50
    #15 0x7ff282a73f0a in absl::lts_20230802::container_internal::raw_hash_set<absl::lts_20230802::container_internal::FlatHashMapPolicy<ray::NodeID, std::shared_ptr<ray::RayletClientInterface> >, absl::lts_20230802::hash_internal::Hash<ray::NodeID>, std::equal_to<ray::NodeID>, std::allocator<std::pair<ray::NodeID const, std::shared_ptr<ray::RayletClientInterface> > > >::erase(absl::lts_20230802::container_internal::raw_hash_set<absl::lts_20230802::container_internal::FlatHashMapPolicy<ray::NodeID, std::shared_ptr<ray::RayletClientInterface> >, absl::lts_20230802::hash_internal::Hash<ray::NodeID>, std::equal_to<ray::NodeID>, std::allocator<std::pair<ray::NodeID const, std::shared_ptr<ray::RayletClientInterface> > > >::iterator) external/com_google_absl/absl/container/internal/raw_hash_set.h:2183
    #16 0x7ff282a73f0a in ray::rpc::RayletClientPool::Disconnect(ray::NodeID) src/ray/raylet_rpc_client/raylet_client_pool.cc:114
    #17 0x7ff282a7aa61 in operator() src/ray/raylet_rpc_client/raylet_client_pool.cc:69
    #18 0x7ff282a7ac66 in __invoke_impl<void, ray::rpc::RayletClientPool::GetDefaultUnavailableTimeoutCallback(ray::gcs::GcsClient*, ray::rpc::RayletClientPool*, const ray::rpc::Address&)::<lambda()>&> /usr/include/c++/11/bits/invoke.h:61
    #19 0x7ff282a7ac66 in __invoke_r<void, ray::rpc::RayletClientPool::GetDefaultUnavailableTimeoutCallback(ray::gcs::GcsClient*, ray::rpc::RayletClientPool*, const ray::rpc::Address&)::<lambda()>&> /usr/include/c++/11/bits/invoke.h:111
    #20 0x7ff282a7ac66 in _M_invoke /usr/include/c++/11/bits/std_function.h:290
    #21 0x7ff28346a1ac in std::function<void ()>::operator()() const /usr/include/c++/11/bits/std_function.h:590
    #22 0x7ff28346a1ac in ray::rpc::RetryableGrpcClient::CheckChannelStatus(bool) src/ray/rpc/retryable_grpc_client.cc:85
    #23 0x7ff28346c06a in operator() src/ray/rpc/retryable_grpc_client.cc:45
```

This is a **non-deterministic race condition** that occurs under the
following sequence:

1. Worker A's pod is preempted → Worker A dies
2. Objects on Worker A are lost
3. Objects are found on Worker B → `PinObjectIDs` RPC is initiated
4. Worker B dies or becomes unavailable → `CheckChannelStatus` detects
this → `Disconnect` is called
5. The `RayletClient` corresponding to Worker B on the driver is
destroyed
6. RPC callback executes and accesses the already-freed `RayletClient` →
use-after-free triggers crash

Whether the use-after-free occurs depends on the relative timing of
steps 5 and 6. In scenarios with frequent pod preemptions, object
recovery frequently triggers `PinObjectIDs`, making this race condition
more likely to occur.

### Root Cause

In `RayletClient::PinObjectIDs`, the RPC callback lambda directly
captured the raw `this` pointer:

```cpp
auto rpc_callback = [this, callback = std::move(callback)](...) {
    pins_in_flight_--;  // Accessing member via 'this' pointer
    ...
};
```

If the `RayletClient` object is destroyed before the async RPC callback
executes, the callback will access freed memory through the dangling
`this` pointer, leading to heap corruption and SIGABRT with the error
message "corrupted size vs. prev_size".

## Solution

The fix ensures that the `RayletClient` object remains alive during the
asynchronous callback execution by:

1. **Inheriting from `std::enable_shared_from_this<RayletClient>`**: The
class already inherits from this base class (line 43 in
`raylet_client.h`), which enables safe shared pointer management.

2. **Capturing `shared_from_this()` in the lambda**: Instead of
capturing the raw `this` pointer, the callback now captures a
`shared_ptr` to the object. The `shared_from_this()` is called before
incrementing `pins_in_flight_` to ensure proper lifetime management:

```cpp
// Capture shared_from_this() before incrementing to ensure object lifetime
// is extended for the async callback, preventing use-after-free.
auto self = shared_from_this();
pins_in_flight_++;
auto rpc_callback = [self, callback = std::move(callback)](
                        Status status, rpc::PinObjectIDsReply &&reply) {
  self->pins_in_flight_--;
  callback(status, std::move(reply));
};
```

This ensures that the `RayletClient` object's lifetime is extended until
the callback completes, preventing the use-after-free bug. By capturing
the shared pointer before incrementing the counter, we also ensure that
if `shared_from_this()` were to fail (though it shouldn't in normal
usage), we don't leave the counter in an inconsistent state.

## Code Changes

- **File**: `src/ray/raylet_rpc_client/raylet_client.cc`
- **Method**: `RayletClient::PinObjectIDs`
- **Change**: Replace `this` capture with `shared_from_this()` capture
in the RPC callback lambda

Signed-off-by: dragongu <andrewgu@vip.qq.com>
Co-authored-by: gulonglong <gulonglong@stepfun.com>
Future-Outlier pushed a commit that referenced this pull request Dec 7, 2025
…BRT: "corrupted size vs. prev_size") (ray-project#58660)

## Summary

This PR fixes a heap corruption bug that causes the driver to crash with
SIGABRT. The issue is caused by a use-after-free when the `RayletClient`
object is destroyed while an asynchronous RPC callback is still pending.

## Problem Description

### Scenario

A Ray Data job (Ray 2.50.0) with pipeline `read_parquet -> filter ->
map_batches -> write` running for 4+ hours, where workers use elastic
resources with low job priority causing frequent worker deaths due to
pod preemption, crashes the driver with SIGABRT:
```
corrupted size vs. prev_size
*** SIGABRT received at time=1761916578 on cpu 30 ***
PC: @ 0x7f073569d9fc (unknown) pthread_kill
Aborted (core dumped)
```

### Trigger Conditions

After reproducing with an ASan image, Asan reveals the actual
use-after-free at:
```
 #0 0x7ff282967361 in std::__atomic_base<long>::fetch_sub(long, std::memory_order) /usr/include/c++/11/bits/atomic_base.h:628
    #1 0x7ff282967361 in std::__atomic_base<long>::operator--(int) /usr/include/c++/11/bits/atomic_base.h:377
    #2 0x7ff282967361 in operator() src/ray/raylet_rpc_client/raylet_client.cc:338
    #3 0x7ff282967361 in __invoke_impl<void, ray::rpc::RayletClient::PinObjectIDs(const ray::rpc::Address&, const std::vector<ray::ObjectID>&, const ray::ObjectID&, ray::rpc::ClientCallback<ray::rpc::PinObjectIDsReply>&)::<lambda(ray::Status, ray::rpc::PinObjectIDsReply&&)>&, const ray::Status&, ray::rpc::PinObjectIDsReply> /usr/include/c++/11/bits/invoke.h:61
    #4 0x7ff282967361 in __invoke_r<void, ray::rpc::RayletClient::PinObjectIDs(const ray::rpc::Address&, const std::vector<ray::ObjectID>&, const ray::ObjectID&, ray::rpc::ClientCallback<ray::rpc::PinObjectIDsReply>&)::<lambda(ray::Status, ray::rpc::PinObjectIDsReply&&)>&, const ray::Status&, ray::rpc::PinObjectIDsReply> /usr/include/c++/11/bits/invoke.h:111
    #5 0x7ff282967361 in _M_invoke /usr/include/c++/11/bits/std_function.h:290
    #6 0x7ff2829fbadf in std::function<void (ray::Status const&, ray::rpc::PinObjectIDsReply&&)>::operator()(ray::Status const&, ray::rpc::PinObjectIDsReply&&) const /usr/include/c++/11/bits/std_function.h:590
    #7 0x7ff2829fbadf in ray::rpc::RetryableGrpcClient::RetryableGrpcRequest::Create<ray::rpc::NodeManagerService, ray::rpc::PinObjectIDsRequest, ray::rpc::PinObjectIDsReply>(std::weak_ptr<ray::rpc::RetryableGrpcClient>, std::unique_ptr<grpc::ClientAsyncResponseReader<ray::rpc::PinObjectIDsReply>, std::default_delete<grpc::ClientAsyncResponseReader<ray::rpc::PinObjectIDsReply> > > (ray::rpc::NodeManagerService::Stub::*)(grpc::ClientContext*, ray::rpc::PinObjectIDsRequest const&, grpc::CompletionQueue*), std::shared_ptr<ray::rpc::GrpcClient<ray::rpc::NodeManagerService> >, std::__cxx11::basic_string<char, std::char_traits<char>, std::allocator<char> >, ray::rpc::PinObjectIDsRequest, std::function<void (ray::Status const&, ray::rpc::PinObjectIDsReply&&)>, long)::{lambda(ray::Status const&)#2}::operator()(ray::Status const&) const bazel-out/k8-dbg/bin/src/ray/rpc/_virtual_includes/retryable_grpc_client/ray/rpc/retryable_grpc_client.h:293
    #8 0x7ff2829fbadf in void std::__invoke_impl<void, ray::rpc::RetryableGrpcClient::RetryableGrpcRequest::Create<ray::rpc::NodeManagerService, ray::rpc::PinObjectIDsRequest, ray::rpc::PinObjectIDsReply>(std::weak_ptr<ray::rpc::RetryableGrpcClient>, std::unique_ptr<grpc::ClientAsyncResponseReader<ray::rpc::PinObjectIDsReply>, std::default_delete<grpc::ClientAsyncResponseReader<ray::rpc::PinObjectIDsReply> > > (ray::rpc::NodeManagerService::Stub::*)(grpc::ClientContext*, ray::rpc::PinObjectIDsRequest const&, grpc::CompletionQueue*), std::shared_ptr<ray::rpc::GrpcClient<ray::rpc::NodeManagerService> >, std::__cxx11::basic_string<char, std::char_traits<char>, std::allocator<char> >, ray::rpc::PinObjectIDsRequest, std::function<void (ray::Status const&, ray::rpc::PinObjectIDsReply&&)>, long)::{lambda(ray::Status const&)#2}&, ray::Status>(std::__invoke_other, ray::rpc::RetryableGrpcClient::RetryableGrpcRequest::Create<ray::rpc::NodeManagerService, ray::rpc::PinObjectIDsRequest, ray::rpc::PinObjectIDsReply>(std::weak_ptr<ray::rpc::RetryableGrpcClient>, std::unique_ptr<grpc::ClientAsyncResponseReader<ray::rpc::PinObjectIDsReply>, std::default_delete<grpc::ClientAsyncResponseReader<ray::rpc::PinObjectIDsReply> > > (ray::rpc::NodeManagerService::Stub::*)(grpc::ClientContext*, ray::rpc::PinObjectIDsRequest const&, grpc::CompletionQueue*), std::shared_ptr<ray::rpc::GrpcClient<ray::rpc::NodeManagerService> >, std::__cxx11::basic_string<char, std::char_traits<char>, std::allocator<char> >, ray::rpc::PinObjectIDsRequest, std::function<void (ray::Status const&, ray::rpc::PinObjectIDsReply&&)>, long)::{lambda(ray::Status const&)#2}&, ray::Status&&) /usr/include/c++/11/bits/invoke.h:61
    #9 0x7ff2829fbadf in std::enable_if<is_invocable_r_v<void, ray::rpc::RetryableGrpcClient::RetryableGrpcRequest::Create<ray::rpc::NodeManagerService, ray::rpc::PinObjectIDsRequest, ray::rpc::PinObjectIDsReply>(std::weak_ptr<ray::rpc::RetryableGrpcClient>, std::unique_ptr<grpc::ClientAsyncResponseReader<ray::rpc::PinObjectIDsReply>, std::default_delete<grpc::ClientAsyncResponseReader<ray::rpc::PinObjectIDsReply> > > (ray::rpc::NodeManagerService::Stub::*)(grpc::ClientContext*, ray::rpc::PinObjectIDsRequest const&, grpc::CompletionQueue*), std::shared_ptr<ray::rpc::GrpcClient<ray::rpc::NodeManagerService> >, std::__cxx11::basic_string<char, std::char_traits<char>, std::allocator<char> >, ray::rpc::PinObjectIDsRequest, std::function<void (ray::Status const&, ray::rpc::PinObjectIDsReply&&)>, long)::{lambda(ray::Status const&)#2}&, ray::Status>, void>::type std::__invoke_r<void, ray::rpc::RetryableGrpcClient::RetryableGrpcRequest::Create<ray::rpc::NodeManagerService, ray::rpc::PinObjectIDsRequest, ray::rpc::PinObjectIDsReply>(std::weak_ptr<ray::rpc::RetryableGrpcClient>, std::unique_ptr<grpc::ClientAsyncResponseReader<ray::rpc::PinObjectIDsReply>, std::default_delete<grpc::ClientAsyncResponseReader<ray::rpc::PinObjectIDsReply> > > (ray::rpc::NodeManagerService::Stub::*)(grpc::ClientContext*, ray::rpc::PinObjectIDsRequest const&, grpc::CompletionQueue*), std::shared_ptr<ray::rpc::GrpcClient<ray::rpc::NodeManagerService> >, std::__cxx11::basic_string<char, std::char_traits<char>, std::allocator<char> >, ray::rpc::PinObjectIDsRequest, std::function<void (ray::Status const&, ray::rpc::PinObjectIDsReply&&)>, long)::{lambda(ray::Status const&)#2}&, ray::Status>(ray::rpc::RetryableGrpcClient::RetryableGrpcRequest::Create<ray::rpc::NodeManagerService, ray::rpc::PinObjectIDsRequest, ray::rpc::PinObjectIDsReply>(std::weak_ptr<ray::rpc::RetryableGrpcClient>, std::unique_ptr<grpc::ClientAsyncResponseReader<ray::rpc::PinObjectIDsReply>, std::default_delete<grpc::ClientAsyncResponseReader<ray::rpc::PinObjectIDsReply> > > (ray::rpc::NodeManagerService::Stub::*)(grpc::ClientContext*, ray::rpc::PinObjectIDsRequest const&, grpc::CompletionQueue*), std::shared_ptr<ray::rpc::GrpcClient<ray::rpc::NodeManagerService> >, std::__cxx11::basic_string<char, std::char_traits<char>, std::allocator<char> >, ray::rpc::PinObjectIDsRequest, std::function<void (ray::Status const&, ray::rpc::PinObjectIDsReply&&)>, long)::{lambda(ray::Status const&)#2}&, ray::Status&&) /usr/include/c++/11/bits/invoke.h:111
    #10 0x7ff2829fbadf in std::_Function_handler<void (ray::Status), ray::rpc::RetryableGrpcClient::RetryableGrpcRequest::Create<ray::rpc::NodeManagerService, ray::rpc::PinObjectIDsRequest, ray::rpc::PinObjectIDsReply>(std::weak_ptr<ray::rpc::RetryableGrpcClient>, std::unique_ptr<grpc::ClientAsyncResponseReader<ray::rpc::PinObjectIDsReply>, std::default_delete<grpc::ClientAsyncResponseReader<ray::rpc::PinObjectIDsReply> > > (ray::rpc::NodeManagerService::Stub::*)(grpc::ClientContext*, ray::rpc::PinObjectIDsRequest const&, grpc::CompletionQueue*), std::shared_ptr<ray::rpc::GrpcClient<ray::rpc::NodeManagerService> >, std::__cxx11::basic_string<char, std::char_traits<char>, std::allocator<char> >, ray::rpc::PinObjectIDsRequest, std::function<void (ray::Status const&, ray::rpc::PinObjectIDsReply&&)>, long)::{lambda(ray::Status const&)#2}>::_M_invoke(std::_Any_data const&, ray::Status&&) /usr/include/c++/11/bits/std_function.h:290
    #11 0x7ff2834657e9 in std::function<void (ray::Status)>::operator()(ray::Status) const /usr/include/c++/11/bits/std_function.h:590
    #12 0x7ff2834657e9 in ray::rpc::RetryableGrpcClient::RetryableGrpcRequest::Fail(ray::Status const&) bazel-out/k8-dbg/bin/src/ray/rpc/_virtual_includes/retryable_grpc_client/ray/rpc/retryable_grpc_client.h:109
    #13 0x7ff2834657e9 in operator() src/ray/rpc/retryable_grpc_client.cc:30
    #14 0x7ff2834657e9 in __invoke_impl<void, ray::rpc::RetryableGrpcClient::~RetryableGrpcClient()::<lambda()>&> /usr/include/c++/11/bits/invoke.h:61
    #15 0x7ff2834657e9 in __invoke_r<void, ray::rpc::RetryableGrpcClient::~RetryableGrpcClient()::<lambda()>&> /usr/include/c++/11/bits/invoke.h:111
    #16 0x7ff2834657e9 in _M_invoke /usr/include/c++/11/bits/std_function.h:290
2025-11-14 16:15:05,608	INFO streaming_executor_state.py:511 -- Running activate tasks is {'MapBatches(QwenInfer)': ['MapBatches(QwenInfer)-79153', 'MapBatches(QwenInfer)-80170', 'MapBatches(QwenInfer)-80225', 'MapBatches(QwenInfer)-80299', 'MapBatches(QwenInfer)-82624'], 'MapBatches(drop_columns)->Write': ['MapBatches(drop_columns)->Write-25244', 'MapBatches(drop_columns)->Write-34438', 'MapBatches(drop_columns)->Write-34439', 'MapBatches(drop_columns)->Write-34440', 'MapBatches(drop_columns)->Write-34441']}
    #17 0x7ff2834e2407 in std::function<void ()>::operator()() const /usr/include/c++/11/bits/std_function.h:590
    #18 0x7ff2834e2407 in EventTracker::RecordExecution(std::function<void ()> const&, std::shared_ptr<StatsHandle>) src/ray/common/event_stats.cc:112
    #19 0x7ff2834bea54 in operator() src/ray/common/asio/instrumented_io_context.cc:110
    #20 0x7ff2834bea54 in __invoke_impl<void, instrumented_io_context::post(std::function<void()>, std::string, int64_t)::<lambda()>&> /usr/include/c++/11/bits/invoke.h:61
    #21 0x7ff2834bea54 in __invoke_r<void, instrumented_io_context::post(std::function<void()>, std::string, int64_t)::<lambda()>&> /usr/include/c++/11/bits/invoke.h:111
    #22 0x7ff2834bea54 in _M_invoke /usr/include/c++/11/bits/std_function.h:290
    #23 0x7ff28242fb5b in std::function<void ()>::operator()() const /usr/include/c++/11/bits/std_function.h:590
    #24 0x7ff28242fb5b in boost::asio::detail::binder0<std::function<void ()> >::operator()() external/boost/boost/asio/detail/bind_handler.hpp:60
    #25 0x7ff28242fb5b in void boost::asio::asio_handler_invoke<boost::asio::detail::binder0<std::function<void ()> > >(boost::asio::detail::binder0<std::function<void ()> >&, ...) external/boost/boost/asio/handler_invoke_hook.hpp:88
    #26 0x7ff28242fb5b in void boost_asio_handler_invoke_helpers::invoke<boost::asio::detail::binder0<std::function<void ()> >, std::function<void ()> >(boost::asio::detail::binder0<std::function<void ()> >&, std::function<void ()>&) external/boost/boost/asio/detail/handler_invoke_helpers.hpp:54
    #27 0x7ff28242fb5b in void boost::asio::detail::asio_handler_invoke<boost::asio::detail::binder0<std::function<void ()> >, std::function<void ()> >(boost::asio::detail::binder0<std::function<void ()> >&, boost::asio::detail::binder0<std::function<void ()> >*) external/boost/boost/asio/detail/bind_handler.hpp:111
    #28 0x7ff28242fb5b in void boost_asio_handler_invoke_helpers::invoke<boost::asio::detail::binder0<std::function<void ()> >, boost::asio::detail::binder0<std::function<void ()> > >(boost::asio::detail::binder0<std::function<void ()> >&, boost::asio::detail::binder0<std::function<void ()> >&) external/boost/boost/asio/detail/handler_invoke_helpers.hpp:54
    #29 0x7ff28242fb5b in boost::asio::detail::executor_op<boost::asio::detail::binder0<std::function<void ()> >, std::allocator<void>, boost::asio::detail::scheduler_operation>::do_complete(void*, boost::asio::detail::scheduler_operation*, boost::system::error_code const&, unsigned long) external/boost/boost/asio/detail/executor_op.hpp:70
    #30 0x7ff2838607d6 in boost::asio::detail::scheduler_operation::complete(void*, boost::system::error_code const&, unsigned long) external/boost/boost/asio/detail/scheduler_operation.hpp:40
    #31 0x7ff2838607d6 in boost::asio::detail::scheduler::do_run_one(boost::asio::detail::conditionally_enabled_mutex::scoped_lock&, boost::asio::detail::scheduler_thread_info&, boost::system::error_code const&) external/boost/boost/asio/detail/impl/scheduler.ipp:492
    #32 0x7ff283892d35 in boost::asio::detail::scheduler::run(boost::system::error_code&) external/boost/boost/asio/detail/impl/scheduler.ipp:210
    #33 0x7ff2838981e0 in boost::asio::io_context::run() external/boost/boost/asio/impl/io_context.ipp:63
2025-11-14 16:15:05,742	INFO streaming_executor_state.py:511 -- Running activate tasks is {'MapBatches(QwenInfer)': ['MapBatches(QwenInfer)-79153', 'MapBatches(QwenInfer)-80170', 'MapBatches(QwenInfer)-80225', 'MapBatches(QwenInfer)-80299', 'MapBatches(QwenInfer)-82624'], 'MapBatches(drop_columns)->Write': ['MapBatches(drop_columns)->Write-25244', 'MapBatches(drop_columns)->Write-34438', 'MapBatches(drop_columns)->Write-34439', 'MapBatches(drop_columns)->Write-34440', 'MapBatches(drop_columns)->Write-34441']}
    #34 0x7ff281e9d0aa in operator() src/ray/core_worker/core_worker_process.cc:193
    #35 0x7ff281e9d247 in run external/boost/boost/thread/detail/thread.hpp:120
    #36 0x7ff282503c47 in thread_proxy external/boost/libs/thread/src/pthread/thread.cpp:179
    #37 0x7ff28b013ac2 in start_thread nptl/pthread_create.c:442
    #38 0x7ff28b0a58bf  (/lib/x86_64-linux-gnu/libc.so.6+0x1268bf)

0x50c003fd3d30 is located 112 bytes inside of 120-byte region [0x50c003fd3cc0,0x50c003fd3d38)
freed by thread T68 here:
2025-11-14 16:15:05,876	INFO streaming_executor_state.py:511 -- Running activate tasks is {'MapBatches(QwenInfer)': ['MapBatches(QwenInfer)-79153', 'MapBatches(QwenInfer)-80170', 'MapBatches(QwenInfer)-80225', 'MapBatches(QwenInfer)-80299', 'MapBatches(QwenInfer)-82624'], 'MapBatches(drop_columns)->Write': ['MapBatches(drop_columns)->Write-25244', 'MapBatches(drop_columns)->Write-34438', 'MapBatches(drop_columns)->Write-34439', 'MapBatches(drop_columns)->Write-34440', 'MapBatches(drop_columns)->Write-34441']}
    #0 0x7ff28b39924f in operator delete(void*, unsigned long) ../../../../src/libsanitizer/asan/asan_new_delete.cpp:172
    #1 0x7ff281eceb5f in __gnu_cxx::new_allocator<std::_Sp_counted_ptr_inplace<ray::rpc::RayletClient, std::allocator<ray::rpc::RayletClient>, (__gnu_cxx::_Lock_policy)2> >::deallocate(std::_Sp_counted_ptr_inplace<ray::rpc::RayletClient, std::allocator<ray::rpc::RayletClient>, (__gnu_cxx::_Lock_policy)2>*, unsigned long) /usr/include/c++/11/ext/new_allocator.h:145
    #2 0x7ff281eceb5f in std::allocator_traits<std::allocator<std::_Sp_counted_ptr_inplace<ray::rpc::RayletClient, std::allocator<ray::rpc::RayletClient>, (__gnu_cxx::_Lock_policy)2> > >::deallocate(std::allocator<std::_Sp_counted_ptr_inplace<ray::rpc::RayletClient, std::allocator<ray::rpc::RayletClient>, (__gnu_cxx::_Lock_policy)2> >&, std::_Sp_counted_ptr_inplace<ray::rpc::RayletClient, std::allocator<ray::rpc::RayletClient>, (__gnu_cxx::_Lock_policy)2>*, unsigned long) /usr/include/c++/11/bits/alloc_traits.h:496
    #3 0x7ff281eceb5f in std::__allocated_ptr<std::allocator<std::_Sp_counted_ptr_inplace<ray::rpc::RayletClient, std::allocator<ray::rpc::RayletClient>, (__gnu_cxx::_Lock_policy)2> > >::~__allocated_ptr() /usr/include/c++/11/bits/allocated_ptr.h:74
    #4 0x7ff281eceb5f in std::_Sp_counted_ptr_inplace<ray::rpc::RayletClient, std::allocator<ray::rpc::RayletClient>, (__gnu_cxx::_Lock_policy)2>::_M_destroy() /usr/include/c++/11/bits/shared_ptr_base.h:538
    #5 0x7ff282a73f0a in std::_Sp_counted_base<(__gnu_cxx::_Lock_policy)2>::_M_release() /usr/include/c++/11/bits/shared_ptr_base.h:184
    #6 0x7ff282a73f0a in std::__shared_count<(__gnu_cxx::_Lock_policy)2>::~__shared_count() /usr/include/c++/11/bits/shared_ptr_base.h:705
    #7 0x7ff282a73f0a in std::__shared_ptr<ray::RayletClientInterface, (__gnu_cxx::_Lock_policy)2>::~__shared_ptr() /usr/include/c++/11/bits/shared_ptr_base.h:1154
    #8 0x7ff282a73f0a in std::shared_ptr<ray::RayletClientInterface>::~shared_ptr() /usr/include/c++/11/bits/shared_ptr.h:122
    #9 0x7ff282a73f0a in std::pair<ray::NodeID const, std::shared_ptr<ray::RayletClientInterface> >::~pair() /usr/include/c++/11/bits/stl_pair.h:211
    #10 0x7ff282a73f0a in void __gnu_cxx::new_allocator<std::pair<ray::NodeID const, std::shared_ptr<ray::RayletClientInterface> > >::destroy<std::pair<ray::NodeID const, std::shared_ptr<ray::RayletClientInterface> > >(std::pair<ray::NodeID const, std::shared_ptr<ray::RayletClientInterface> >*) /usr/include/c++/11/ext/new_allocator.h:168
    #11 0x7ff282a73f0a in void std::allocator_traits<std::allocator<std::pair<ray::NodeID const, std::shared_ptr<ray::RayletClientInterface> > > >::destroy<std::pair<ray::NodeID const, std::shared_ptr<ray::RayletClientInterface> > >(std::allocator<std::pair<ray::NodeID const, std::shared_ptr<ray::RayletClientInterface> > >&, std::pair<ray::NodeID const, std::shared_ptr<ray::RayletClientInterface> >*) /usr/include/c++/11/bits/alloc_traits.h:535
    #12 0x7ff282a73f0a in void absl::lts_20230802::container_internal::map_slot_policy<ray::NodeID, std::shared_ptr<ray::RayletClientInterface> >::destroy<std::allocator<std::pair<ray::NodeID const, std::shared_ptr<ray::RayletClientInterface> > > >(std::allocator<std::pair<ray::NodeID const, std::shared_ptr<ray::RayletClientInterface> > >*, absl::lts_20230802::container_internal::map_slot_type<ray::NodeID, std::shared_ptr<ray::RayletClientInterface> >*) external/com_google_absl/absl/container/internal/container_memory.h:421
    #13 0x7ff282a73f0a in void absl::lts_20230802::container_internal::FlatHashMapPolicy<ray::NodeID, std::shared_ptr<ray::RayletClientInterface> >::destroy<std::allocator<std::pair<ray::NodeID const, std::shared_ptr<ray::RayletClientInterface> > > >(std::allocator<std::pair<ray::NodeID const, std::shared_ptr<ray::RayletClientInterface> > >*, absl::lts_20230802::container_internal::map_slot_type<ray::NodeID, std::shared_ptr<ray::RayletClientInterface> >*) external/com_google_absl/absl/container/flat_hash_map.h:578
    #14 0x7ff282a73f0a in void absl::lts_20230802::container_internal::common_policy_traits<absl::lts_20230802::container_internal::FlatHashMapPolicy<ray::NodeID, std::shared_ptr<ray::RayletClientInterface> >, void>::destroy<std::allocator<std::pair<ray::NodeID const, std::shared_ptr<ray::RayletClientInterface> > > >(std::allocator<std::pair<ray::NodeID const, std::shared_ptr<ray::RayletClientInterface> > >*, absl::lts_20230802::container_internal::map_slot_type<ray::NodeID, std::shared_ptr<ray::RayletClientInterface> >*) external/com_google_absl/absl/container/internal/common_policy_traits.h:50
    #15 0x7ff282a73f0a in absl::lts_20230802::container_internal::raw_hash_set<absl::lts_20230802::container_internal::FlatHashMapPolicy<ray::NodeID, std::shared_ptr<ray::RayletClientInterface> >, absl::lts_20230802::hash_internal::Hash<ray::NodeID>, std::equal_to<ray::NodeID>, std::allocator<std::pair<ray::NodeID const, std::shared_ptr<ray::RayletClientInterface> > > >::erase(absl::lts_20230802::container_internal::raw_hash_set<absl::lts_20230802::container_internal::FlatHashMapPolicy<ray::NodeID, std::shared_ptr<ray::RayletClientInterface> >, absl::lts_20230802::hash_internal::Hash<ray::NodeID>, std::equal_to<ray::NodeID>, std::allocator<std::pair<ray::NodeID const, std::shared_ptr<ray::RayletClientInterface> > > >::iterator) external/com_google_absl/absl/container/internal/raw_hash_set.h:2183
    #16 0x7ff282a73f0a in ray::rpc::RayletClientPool::Disconnect(ray::NodeID) src/ray/raylet_rpc_client/raylet_client_pool.cc:114
    #17 0x7ff282a7aa61 in operator() src/ray/raylet_rpc_client/raylet_client_pool.cc:69
    #18 0x7ff282a7ac66 in __invoke_impl<void, ray::rpc::RayletClientPool::GetDefaultUnavailableTimeoutCallback(ray::gcs::GcsClient*, ray::rpc::RayletClientPool*, const ray::rpc::Address&)::<lambda()>&> /usr/include/c++/11/bits/invoke.h:61
    #19 0x7ff282a7ac66 in __invoke_r<void, ray::rpc::RayletClientPool::GetDefaultUnavailableTimeoutCallback(ray::gcs::GcsClient*, ray::rpc::RayletClientPool*, const ray::rpc::Address&)::<lambda()>&> /usr/include/c++/11/bits/invoke.h:111
    #20 0x7ff282a7ac66 in _M_invoke /usr/include/c++/11/bits/std_function.h:290
    #21 0x7ff28346a1ac in std::function<void ()>::operator()() const /usr/include/c++/11/bits/std_function.h:590
    #22 0x7ff28346a1ac in ray::rpc::RetryableGrpcClient::CheckChannelStatus(bool) src/ray/rpc/retryable_grpc_client.cc:85
    #23 0x7ff28346c06a in operator() src/ray/rpc/retryable_grpc_client.cc:45
```

This is a **non-deterministic race condition** that occurs under the
following sequence:

1. Worker A's pod is preempted → Worker A dies
2. Objects on Worker A are lost
3. Objects are found on Worker B → `PinObjectIDs` RPC is initiated
4. Worker B dies or becomes unavailable → `CheckChannelStatus` detects
this → `Disconnect` is called
5. The `RayletClient` corresponding to Worker B on the driver is
destroyed
6. RPC callback executes and accesses the already-freed `RayletClient` →
use-after-free triggers crash

Whether the use-after-free occurs depends on the relative timing of
steps 5 and 6. In scenarios with frequent pod preemptions, object
recovery frequently triggers `PinObjectIDs`, making this race condition
more likely to occur.

### Root Cause

In `RayletClient::PinObjectIDs`, the RPC callback lambda directly
captured the raw `this` pointer:

```cpp
auto rpc_callback = [this, callback = std::move(callback)](...) {
    pins_in_flight_--;  // Accessing member via 'this' pointer
    ...
};
```

If the `RayletClient` object is destroyed before the async RPC callback
executes, the callback will access freed memory through the dangling
`this` pointer, leading to heap corruption and SIGABRT with the error
message "corrupted size vs. prev_size".

## Solution

The fix ensures that the `RayletClient` object remains alive during the
asynchronous callback execution by:

1. **Inheriting from `std::enable_shared_from_this<RayletClient>`**: The
class already inherits from this base class (line 43 in
`raylet_client.h`), which enables safe shared pointer management.

2. **Capturing `shared_from_this()` in the lambda**: Instead of
capturing the raw `this` pointer, the callback now captures a
`shared_ptr` to the object. The `shared_from_this()` is called before
incrementing `pins_in_flight_` to ensure proper lifetime management:

```cpp
// Capture shared_from_this() before incrementing to ensure object lifetime
// is extended for the async callback, preventing use-after-free.
auto self = shared_from_this();
pins_in_flight_++;
auto rpc_callback = [self, callback = std::move(callback)](
                        Status status, rpc::PinObjectIDsReply &&reply) {
  self->pins_in_flight_--;
  callback(status, std::move(reply));
};
```

This ensures that the `RayletClient` object's lifetime is extended until
the callback completes, preventing the use-after-free bug. By capturing
the shared pointer before incrementing the counter, we also ensure that
if `shared_from_this()` were to fail (though it shouldn't in normal
usage), we don't leave the counter in an inconsistent state.

## Code Changes

- **File**: `src/ray/raylet_rpc_client/raylet_client.cc`
- **Method**: `RayletClient::PinObjectIDs`
- **Change**: Replace `this` capture with `shared_from_this()` capture
in the RPC callback lambda

Signed-off-by: dragongu <andrewgu@vip.qq.com>
Co-authored-by: gulonglong <gulonglong@stepfun.com>
Signed-off-by: Future-Outlier <eric901201@gmail.com>
pull bot pushed a commit that referenced this pull request Feb 18, 2026
ray-project#61034)

Currently, there is a chance that a worker can crash on the `getenv`
syscall from the otel lazy initialization. We found the race is between
`setenv` on the user thread (`setenv(RBLN_DEVICES)`) and `getenv` on the
worker internal thread. However, we can't forbid `setenv` on a user's
thread; the only thing we can do is not call `getenv` once the user's
thread starts.

Here is the backtrace of the crash we found by intercepting the
`getenv`:

```
[getenv_preload] setenv name=RBLN_DEVICES value= overwrite=1
[getenv_preload] setenv backtrace:
  #0 /home/ray/getenv_trace_preload.so(setenv+0x73) [0x748a77ea870b]
  #1 ray::IDLE(+0x224d5b) [0x59f10aeead5b]
  #2 ray::IDLE(+0x13dfc3) [0x59f10ae03fc3]
  #3 ray::IDLE(_PyEval_EvalFrameDefault+0x313) [0x59f10adf3703]
  #4 ray::IDLE(+0x184bfd) [0x59f10ae4abfd]
  #5 ray::IDLE(+0x19da04) [0x59f10ae63a04]
  #6 ray::IDLE(_PyEval_EvalFrameDefault+0x115a) [0x59f10adf454a]
  #7 ray::IDLE(_PyFunction_Vectorcall+0x6c) [0x59f10ae03dfc]
  #8 ray::IDLE(_PyEval_EvalFrameDefault+0x49ae) [0x59f10adf7d9e]
  #9 ray::IDLE(_PyFunction_Vectorcall+0x6c) [0x59f10ae03dfc]
  #10 /home/ray/anaconda3/lib/python3.10/site-packages/ray/_raylet.so(+0x9a9333) [0x748a76270333]
  #11 /home/ray/anaconda3/lib/python3.10/site-packages/ray/_raylet.so(_ZNSt17_Function_handlerIFN3ray6StatusERKNS0_3rpc7AddressENS2_8TaskTypeESsRKNS0_4core11RayFunctionERKSt13unordered_mapISsdSt4hashISsESt8equal_toISsESaISt4pairIKSsdEEERKSt6vectorISt10shared_ptrINS0_9RayObjectEESaISQ_EERKSN_INS2_15ObjectReferenceESaISV_EERSH_S10_PSN_ISG_INS0_8ObjectIDESQ_ESaIS12_EES15_PSN_ISG_IS11_bESaIS16_EERSO_INS0_17LocalMemoryBufferEEPbPSsS1E_RKSN_INS0_16ConcurrencyGroupESaIS1F_EESsbbblRKSt8optionalISsEEPFS1_S5_S6_SsSA_SM_SU_SZ_SsSsS15_S15_S19_S1C_S1D_S1E_S1E_S1J_SsbbblS1L_EE9_M_invokeERKSt9_Any_dataS5_OS6_OSsSA_SM_SU_SZ_S10_S10_OS15_S1X_OS19_S1C_OS1D_OS1E_S20_S1J_S1W_ObS21_S21_OlS1N_+0x1ab) [0x748a761786ab]
  #12 /home/ray/anaconda3/lib/python3.10/site-packages/ray/_raylet.so(_ZN3ray4core10CoreWorker11ExecuteTaskERKNS_17TaskSpecificationESt8optionalISt13unordered_mapISsSt6vectorISt4pairIldESaIS9_EESt4hashISsESt8equal_toISsESaIS8_IKSsSB_EEEEPS7_IS8_INS_8ObjectIDESt10shared_ptrINS_9RayObjectEEESaISP_EESS_PS7_IS8_ISL_bESaIST_EEPN6google8protobuf16RepeatedPtrFieldINS_3rpc20ObjectReferenceCountEEEPbPSsS15_+0x1166) [0x748a76320a96]
  #13 /home/ray/anaconda3/lib/python3.10/site-packages/ray/_raylet.so(_ZNSt17_Function_handlerIFN3ray6StatusERKNS0_17TaskSpecificationESt8optionalISt13unordered_mapISsSt6vectorISt4pairIldESaIS9_EESt4hashISsESt8equal_toISsESaIS8_IKSsSB_EEEEPS7_IS8_INS0_8ObjectIDESt10shared_ptrINS0_9RayObjectEEESaISP_EESS_PS7_IS8_ISL_bESaIST_EEPN6google8protobuf16RepeatedPtrFieldINS0_3rpc20ObjectReferenceCountEEEPbPSsS15_ESt5_BindIFMNS0_4core10CoreWorkerEFS1_S4_SK_SS_SS_SW_S13_S14_S15_S15_EPS19_St12_PlaceholderILi1EES1D_ILi2EES1D_ILi3EES1D_ILi4EES1D_ILi5EES1D_ILi6EES1D_ILi7EES1D_ILi8EES1D_ILi9EEEEE9_M_invokeERKSt9_Any_dataS4_OSK_OSS_S1U_OSW_OS13_OS14_OS15_S1Y_+0x87) [0x748a762e8647]
  #14 /home/ray/anaconda3/lib/python3.10/site-packages/ray/_raylet.so(+0xb5186d) [0x748a7641886d]
  #15 /home/ray/anaconda3/lib/python3.10/site-packages/ray/_raylet.so(+0xb557c5) [0x748a7641c7c5]
  #16 /home/ray/anaconda3/lib/python3.10/site-packages/ray/_raylet.so(+0x103e3eb) [0x748a769053eb]
  #17 /home/ray/anaconda3/lib/python3.10/site-packages/ray/_raylet.so(+0x1034f0b) [0x748a768fbf0b]
  #18 /home/ray/anaconda3/lib/python3.10/site-packages/ray/_raylet.so(+0xb6f21b) [0x748a7643621b]
  #19 /home/ray/anaconda3/lib/python3.10/site-packages/ray/_raylet.so(+0x15893cb) [0x748a76e503cb]
  #20 /home/ray/anaconda3/lib/python3.10/site-packages/ray/_raylet.so(+0x158ad69) [0x748a76e51d69]
  #21 /home/ray/anaconda3/lib/python3.10/site-packages/ray/_raylet.so(+0x158b472) [0x748a76e52472]
  #22 /home/ray/anaconda3/lib/python3.10/site-packages/ray/_raylet.so(_ZN3ray4core10CoreWorker20RunTaskExecutionLoopEv+0x132) [0x748a762e4252]
  #23 /home/ray/anaconda3/lib/python3.10/site-packages/ray/_raylet.so(_ZN3ray4core21CoreWorkerProcessImpl26RunWorkerTaskExecutionLoopEv+0x41) [0x748a76336bd1]
  #24 /home/ray/anaconda3/lib/python3.10/site-packages/ray/_raylet.so(+0x8a45c1) [0x748a7616b5c1]
  #25 ray::IDLE(_PyEval_EvalFrameDefault+0x6fb) [0x59f10adf3aeb]
  #26 ray::IDLE(_PyFunction_Vectorcall+0x6c) [0x59f10ae03dfc]
  #27 ray::IDLE(_PyEval_EvalFrameDefault+0x6fb) [0x59f10adf3aeb]
  #28 ray::IDLE(+0x1d5cac) [0x59f10ae9bcac]
  #29 ray::IDLE(PyEval_EvalCode+0x85) [0x59f10ae9bbf5]
  #30 ray::IDLE(+0x20732a) [0x59f10aecd32a]
  #31 ray::IDLE(+0x201d13) [0x59f10aec7d13]
  #32 ray::IDLE(+0x976be) [0x59f10ad5d6be]
  #33 ray::IDLE(_PyRun_SimpleFileObject+0x1bb) [0x59f10aec23db]
  #34 ray::IDLE(_PyRun_AnyFileObject+0x44) [0x59f10aec1f74]
  #35 ray::IDLE(Py_RunMain+0x371) [0x59f10aebf3e1]
  #36 ray::IDLE(Py_BytesMain+0x37) [0x59f10ae8f447]
  #37 /lib/x86_64-linux-gnu/libc.so.6(+0x29d90) [0x748a77baad90]
  #38 /lib/x86_64-linux-gnu/libc.so.6(__libc_start_main+0x80) [0x748a77baae40]
  #39 ray::IDLE(+0x1c930e) [0x59f10ae8f30e]

[getenv_preload] getenv name=OTEL_CPP_EXPORTER_OTLP_METRICS_RETRY_BACKOFF_MULTIPLIER
[getenv_preload] backtrace:
  #0 /home/ray/anaconda3/lib/python3.10/site-packages/ray/_raylet.so(+0x10a9d17) [0x7321ce3c9d17]
  #1 /home/ray/anaconda3/lib/python3.10/site-packages/ray/_raylet.so(+0x10abe2b) [0x7321ce3cbe2b]
  #2 /home/ray/anaconda3/lib/python3.10/site-packages/ray/_raylet.so(+0x1050ffc) [0x7321ce370ffc]
  #3 /home/ray/anaconda3/lib/python3.10/site-packages/ray/_raylet.so(+0x104f4d7) [0x7321ce36f4d7]
  #4 /home/ray/anaconda3/lib/python3.10/site-packages/ray/_raylet.so(+0x1045833) [0x7321ce365833]
  #5 /home/ray/anaconda3/lib/python3.10/site-packages/ray/_raylet.so(+0xa6c760) [0x7321cdd8c760]
  #6 /home/ray/anaconda3/lib/python3.10/site-packages/ray/_raylet.so(+0xe69d9a) [0x7321ce189d9a]
  #7 /home/ray/anaconda3/lib/python3.10/site-packages/ray/_raylet.so(_ZN3ray3rpc14ClientCallImplINS0_16HealthCheckReplyEE15OnReplyReceivedEv+0x165) [0x7321ce18c005]
  #8 /home/ray/anaconda3/lib/python3.10/site-packages/ray/_raylet.so(_ZNSt17_Function_handlerIFvvEZN3ray3rpc17ClientCallManager29PollEventsFromCompletionQueueEiEUlvE_E9_M_invokeERKSt9_Any_data+0x15) [0x7321cdd8e475]
  #9 /home/ray/anaconda3/lib/python3.10/site-packages/ray/_raylet.so(+0x103e3eb) [0x7321ce35e3eb]
  #10 /home/ray/anaconda3/lib/python3.10/site-packages/ray/_raylet.so(+0x1034f0b) [0x7321ce354f0b]
  #11 /home/ray/anaconda3/lib/python3.10/site-packages/ray/_raylet.so(+0xb6f21b) [0x7321cde8f21b]
  #12 /home/ray/anaconda3/lib/python3.10/site-packages/ray/_raylet.so(+0x15893cb) [0x7321ce8a93cb]
  #13 /home/ray/anaconda3/lib/python3.10/site-packages/ray/_raylet.so(+0x158ad69) [0x7321ce8aad69]
  #14 /home/ray/anaconda3/lib/python3.10/site-packages/ray/_raylet.so(+0x158b472) [0x7321ce8ab472]
  #15 /home/ray/anaconda3/lib/python3.10/site-packages/ray/_raylet.so(+0xa6bb54) [0x7321cdd8bb54]
  #16 /home/ray/anaconda3/lib/python3.10/site-packages/ray/_raylet.so(+0xba2250) [0x7321cdec2250]
  #17 /lib/x86_64-linux-gnu/libc.so.6(+0x94ac3) [0x7321cf66eac3]
  #18 /lib/x86_64-linux-gnu/libc.so.6(+0x1268d0) [0x7321cf7008d0]

*** SIGSEGV received at time=1770862205 on cpu 1 ***
PC: @     0x748a77bc5c1d  (unknown)  getenv
    @     0x748a77bc3520  (unknown)  (unknown)
{"asctime":"2026-02-11 18:10:05,910","levelname":"E","message":"*** SIGSEGV received at time=1770862205 on cpu 1 ***","filename":"logging.cc","lineno":474}
{"asctime":"2026-02-11 18:10:05,910","levelname":"E","message":"PC: @     0x748a77bc5c1d  (unknown)  getenv","filename":"logging.cc","lineno":474}
{"asctime":"2026-02-11 18:10:05,910","levelname":"E","message":"    @     0x748a77bc3520  (unknown)  (unknown)","filename":"logging.cc","lineno":474}
Fatal Python error: Segmentation fault
```

According to the backtrace, we can identify that it is the
`OtlpGrpcMetricExporterOptions`, [which called
`getenv(OTEL_CPP_EXPORTER_OTLP_METRICS_RETRY_BACKOFF_MULTIPLIER)`](https://github.com/open-telemetry/opentelemetry-cpp/blob/13ad05a6f431efb76995cffb1225d26b45374749/exporters/otlp/src/otlp_grpc_metric_exporter_options.cc#L47),
getting initialized by calling `InitOpenTelemetryExporter` in the
`metrics_agent_client_->WaitForServerReady()` callback, that causes the
issue.

This PR moves `OtlpGrpcMetricExporterOptions` into
`OpenTelemetryMetricRecorder` (so that we keep otel details
encapsulated) and moves its initialization early to `stats::Init()`, to
force the `OtlpGrpcMetricExporterOptions` to be initialized early, so
that we don't call `getenv` afterward.

---------

Signed-off-by: Rueian Huang <rueiancsie@gmail.com>
pull bot pushed a commit that referenced this pull request Feb 28, 2026
## Description
grpc 1.57.1 will call `GetEnv("GRPC_EXPERIMENTAL_PICKFIRST_LB_CONFIG")`
on every grpc channel establishment for parsing load-balancing policy.
This causes race conditions between user tasks as they are allowed to do
setenv at anytime. This PR upgrades the grpc lib to 1.58.0 to get rid of
the `GetEnv("GRPC_EXPERIMENTAL_PICKFIRST_LB_CONFIG")`.

```
(gdb) bt
#0  __pthread_kill_implementation (no_tid=0, signo=11, threadid=129183804413504) at ./nptl/pthread_kill.c:44
#1  __pthread_kill_internal (signo=11, threadid=129183804413504) at ./nptl/pthread_kill.c:78
#2  __GI___pthread_kill (threadid=129183804413504, signo=signo@entry=11) at ./nptl/pthread_kill.c:89
#3  0x00007580a7545476 in __GI_raise (sig=11) at ../sysdeps/posix/raise.c:26
#4  <signal handler called>
#5  __pthread_kill_implementation (no_tid=0, signo=11, threadid=129183804413504) at ./nptl/pthread_kill.c:44
#6  __pthread_kill_internal (signo=11, threadid=129183804413504) at ./nptl/pthread_kill.c:78
#7  __GI___pthread_kill (threadid=129183804413504, signo=signo@entry=11) at ./nptl/pthread_kill.c:89
#8  0x00007580a7545476 in __GI_raise (sig=11) at ../sysdeps/posix/raise.c:26
#9  <signal handler called>
#10 __GI_getenv (name=0x7580a6a078c2 "PC_EXPERIMENTAL_PICKFIRST_LB_CONFIG") at ./stdlib/getenv.c:84
#11 0x00007580a67e8b8a in grpc_core::GetEnv(char const*) () from /home/ray/anaconda3/lib/python3.10/site-packages/ray/_raylet.so
#12 0x00007580a649601f in grpc_core::ShufflePickFirstEnabled() () from /home/ray/anaconda3/lib/python3.10/site-packages/ray/_raylet.so
#13 0x00007580a64960ed in grpc_core::json_detail::FinishedJsonObjectLoader<grpc_core::(anonymous namespace)::PickFirstConfig, 1ul, void>::LoadInto(grpc_core::experimental::Json const&, grpc_core::JsonArgs const&, void*, grpc_core::ValidationErrors*) const () from /home/ray/anaconda3/lib/python3.10/site-packages/ray/_raylet.so
#14 0x00007580a6787384 in grpc_core::json_detail::LoadWrapped::LoadInto(grpc_core::experimental::Json const&, grpc_core::JsonArgs const&, void*, grpc_core::ValidationErrors*) const ()
   from /home/ray/anaconda3/lib/python3.10/site-packages/ray/_raylet.so
#15 0x00007580a6497b07 in grpc_core::(anonymous namespace)::PickFirstFactory::ParseLoadBalancingConfig(grpc_core::experimental::Json const&) const ()
   from /home/ray/anaconda3/lib/python3.10/site-packages/ray/_raylet.so
#16 0x00007580a67c18a7 in grpc_core::LoadBalancingPolicyRegistry::ParseLoadBalancingConfig(grpc_core::experimental::Json const&) const ()
   from /home/ray/anaconda3/lib/python3.10/site-packages/ray/_raylet.so
#17 0x00007580a66ad9b8 in grpc_core::ClientChannel::OnResolverResultChangedLocked(grpc_core::Resolver::Result) () from /home/ray/anaconda3/lib/python3.10/site-packages/ray/_raylet.so
#18 0x00007580a66ae452 in grpc_core::ClientChannel::ResolverResultHandler::ReportResult(grpc_core::Resolver::Result) ()
   from /home/ray/anaconda3/lib/python3.10/site-packages/ray/_raylet.so
#19 0x00007580a63bc603 in grpc_core::PollingResolver::OnRequestCompleteLocked(grpc_core::Resolver::Result) () from /home/ray/anaconda3/lib/python3.10/site-packages/ray/_raylet.so
#20 0x00007580a63bcb2d in std::_Function_handler<void (), grpc_core::PollingResolver::OnRequestComplete(grpc_core::Resolver::Result)::{lambda()#1}>::_M_invoke(std::_Any_data const&)
    () from /home/ray/anaconda3/lib/python3.10/site-packages/ray/_raylet.so
#21 0x00007580a67cbf46 in grpc_core::WorkSerializer::WorkSerializerImpl::Run(std::function<void ()>, grpc_core::DebugLocation const&) ()
   from /home/ray/anaconda3/lib/python3.10/site-packages/ray/_raylet.so
#22 0x00007580a67cc0ea in grpc_core::WorkSerializer::Run(std::function<void ()>, grpc_core::DebugLocation const&) ()
   from /home/ray/anaconda3/lib/python3.10/site-packages/ray/_raylet.so
#23 0x00007580a63bd117 in grpc_core::PollingResolver::OnRequestComplete(grpc_core::Resolver::Result) () from /home/ray/anaconda3/lib/python3.10/site-packages/ray/_raylet.so
#24 0x00007580a63b3f86 in grpc_core::(anonymous namespace)::AresClientChannelDNSResolver::AresRequestWrapper::OnHostnameResolved(void*, absl::lts_20230802::Status) ()
   from /home/ray/anaconda3/lib/python3.10/site-packages/ray/_raylet.so
#25 0x00007580a67c44c4 in grpc_core::ExecCtx::Flush() () from /home/ray/anaconda3/lib/python3.10/site-packages/ray/_raylet.so
#26 0x00007580a63408a2 in grpc_core::ExecCtx::~ExecCtx() () from /home/ray/anaconda3/lib/python3.10/site-packages/ray/_raylet.so
#27 0x00007580a6740343 in grpc_call_start_batch () from /home/ray/anaconda3/lib/python3.10/site-packages/ray/_raylet.so
#28 0x00007580a5e281e9 in grpc::internal::CallOpSet<grpc::internal::CallOpSendInitialMetadata, grpc::internal::CallOpSendMessage, grpc::internal::CallOpRecvInitialMetadata, grpc::internal::CallOpRecvMessage<google::protobuf::MessageLite>, grpc::internal::CallOpClientSendClose, grpc::internal::CallOpClientRecvStatus>::ContinueFillOpsAfterInterception() ()
   from /home/ray/anaconda3/lib/python3.10/site-packages/ray/_raylet.so
#29 0x00007580a5e2d809 in grpc::internal::BlockingUnaryCallImpl<google::protobuf::MessageLite, google::protobuf::MessageLite>::BlockingUnaryCallImpl(grpc::ChannelInterface*, grpc::inte--Type <RET> for more, q to quit, c to continue without paging--c
rnal::RpcMethod const&, grpc::ClientContext*, google::protobuf::MessageLite const&, google::protobuf::MessageLite*) () from /home/ray/anaconda3/lib/python3.10/site-packages/ray/_raylet.so
#30 0x00007580a62d76ea in opentelemetry::proto::collector::metrics::v1::MetricsService::Stub::Export(grpc::ClientContext*, opentelemetry::proto::collector::metrics::v1::ExportMetricsServiceRequest const&, opentelemetry::proto::collector::metrics::v1::ExportMetricsServiceResponse*) () from /home/ray/anaconda3/lib/python3.10/site-packages/ray/_raylet.so
#31 0x00007580a62ca40c in opentelemetry::v1::exporter::otlp::OtlpGrpcClient::DelegateExport(opentelemetry::proto::collector::metrics::v1::MetricsService::StubInterface*, std::unique_ptr<grpc::ClientContext, std::default_delete<grpc::ClientContext> >&&, std::unique_ptr<google::protobuf::Arena, std::default_delete<google::protobuf::Arena> >&&, opentelemetry::proto::collector::metrics::v1::ExportMetricsServiceRequest&&, opentelemetry::proto::collector::metrics::v1::ExportMetricsServiceResponse*) () from /home/ray/anaconda3/lib/python3.10/site-packages/ray/_raylet.so
#32 0x00007580a62c23ed in opentelemetry::v1::exporter::otlp::OtlpGrpcMetricExporter::Export(opentelemetry::v1::sdk::metrics::ResourceMetrics const&) () from /home/ray/anaconda3/lib/python3.10/site-packages/ray/_raylet.so
#33 0x00007580a62c0334 in (anonymous namespace)::OpenTelemetryMetricExporter::Export(opentelemetry::v1::sdk::metrics::ResourceMetrics const&) () from /home/ray/anaconda3/lib/python3.10/site-packages/ray/_raylet.so
#34 0x00007580a62e5fdf in opentelemetry::v1::sdk::metrics::PeriodicExportingMetricReader::CollectAndExportOnce()::{lambda()#1}::operator()() const::{lambda(opentelemetry::v1::sdk::metrics::ResourceMetrics&)#1}::operator()(opentelemetry::v1::sdk::metrics::ResourceMetrics&) const () from /home/ray/anaconda3/lib/python3.10/site-packages/ray/_raylet.so
#35 0x00007580a62ee7a6 in opentelemetry::v1::sdk::metrics::MetricReader::Collect(opentelemetry::v1::nostd::function_ref<bool (opentelemetry::v1::sdk::metrics::ResourceMetrics&)>) () from /home/ray/anaconda3/lib/python3.10/site-packages/ray/_raylet.so
#36 0x00007580a62e5085 in std::thread::_State_impl<std::thread::_Invoker<std::tuple<opentelemetry::v1::sdk::metrics::PeriodicExportingMetricReader::CollectAndExportOnce()::{lambda()#1}> > >::_M_run() () from /home/ray/anaconda3/lib/python3.10/site-packages/ray/_raylet.so
#37 0x00007580a6997be0 in execute_native_thread_routine () from /home/ray/anaconda3/lib/python3.10/site-packages/ray/_raylet.so
#38 0x00007580a7597ac3 in start_thread (arg=<optimized out>) at ./nptl/pthread_create.c:442
#39 0x00007580a76298d0 in clone3 () at ../sysdeps/unix/sysv/linux/x86_64/clone3.S:81
```

Signed-off-by: Rueian Huang <rueiancsie@gmail.com>
pull bot pushed a commit that referenced this pull request Mar 20, 2026
updating rayllm stale hashes for opentelemetry-sdk in
`python/deplocks/llm/rayllm_test_py312_cu130.lock`

failing postmerge build:
https://buildkite.com/ray-project/postmerge/builds/16575#_

Successful local build logs
```
environment instead: https://pip.pypa.io/warnings/venv. Use the --root-user-action option if you know what you are doing and want to suppress this warning.
#10 DONE 395.9s

#11 exporting to image
#11 exporting layers
#11 exporting layers 84.0s done
#11 writing image sha256:38cfb1ceb9d5b48e3c6c51560401e9a184958fc272a942fe76dc0728947aed94 done
#11 naming to cr.ray.io/rayproject/llmgpubuild-py312 done
#11 naming to localhost:5000/rayci-work:llmgpubuild-py312 done
#11 naming to localhost:5000/rayci-work:z-bf8e5fafc62f806191b7a3827b333416d8ec7b169807dde435a970317846c6af done
#11 DONE 84.1s
```

Signed-off-by: elliot-barn <elliot.barnwell@anyscale.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

7 participants