[pull] master from ray-project:master#9
Merged
pull[bot] merged 9 commits intoFuture-Outlier:masterfrom Jun 11, 2025
Merged
Conversation
…oo backend (#53319) Adds single-controller APIs (APIs that can be called from the driver) for creating collectives on a group of actors using `ray.util.collective`. These APIs are currently under `ray.experimental.collective` as they are experimental and to avoid potential conflicts with `ray.util.collective`. See test_experimental_collective::test_api_basic for API usage. - create_collective_group - destroy_collective_group - get_collective_groups Also adds a ray.util.collective backend based on torch.distributed gloo, for convenient testing on CPUs. While ray.util.collective has a pygloo backend, this backend requires pygloo to be installed, and pygloo doesn't seem to be supported on latest versions of Python. --------- Signed-off-by: Stephanie wang <smwang@cs.washington.edu> Signed-off-by: Stephanie Wang <smwang@cs.washington.edu> Co-authored-by: Kai-Hsun Chen <kaihsun@apache.org> Co-authored-by: Edward Oakes <ed.nmi.oakes@gmail.com>
Remove some open telemetry code that was added in #51077. These files were added to test the complication of the open telemetry library, but we never end up using these files. Test: - CI Signed-off-by: can <can@anyscale.com>
…t_ref` for small and non-GPU objects (#53692) This PR is based on #53630. See #53623 for the issue. In this PR, we clear the object ref when the arg's tensor transport is not OBJECT_STORE. Closes #53623 --------- Signed-off-by: Stephanie wang <smwang@cs.washington.edu> Signed-off-by: Kai-Hsun Chen <kaihsun@anyscale.com> Signed-off-by: Kai-Hsun Chen <kaihsun@apache.org> Co-authored-by: Stephanie wang <smwang@cs.washington.edu> Co-authored-by: Stephanie Wang <swang@cs.berkeley.edu>
follow up of #53652 Signed-off-by: abrar <abrar@anyscale.com>
Improve the string representation of `WorkerHealthCheckFailedError` to also include the base reason why the health check failed. Signed-off-by: Matthew Deng <matt@anyscale.com>
…#53718) Signed-off-by: Timothy Seah <tseah@anyscale.com> Co-authored-by: Timothy Seah <tseah@anyscale.com>
``` REGRESSION 12.82%: tasks_per_second (THROUGHPUT) regresses from 221.2222291023174 to 192.87246715163326 in benchmarks/many_nodes.json REGRESSION 12.73%: actors_per_second (THROUGHPUT) regresses from 634.2824761754516 to 553.5098466276525 in benchmarks/many_actors.json REGRESSION 12.26%: client__get_calls (THROUGHPUT) regresses from 1160.5254002780266 to 1018.2939193917422 in microbenchmark.json REGRESSION 5.15%: multi_client_put_gigabytes (THROUGHPUT) regresses from 39.896743394372585 to 37.84234603653026 in microbenchmark.json REGRESSION 4.04%: client__tasks_and_get_batch (THROUGHPUT) regresses from 0.9480091293556955 to 0.909684480871914 in microbenchmark.json REGRESSION 3.72%: 1_n_actor_calls_async (THROUGHPUT) regresses from 8318.094433102775 to 8008.806358661164 in microbenchmark.json REGRESSION 3.01%: 1_1_actor_calls_sync (THROUGHPUT) regresses from 2020.4236901532247 to 1959.5608579309087 in microbenchmark.json REGRESSION 2.80%: n_n_async_actor_calls_async (THROUGHPUT) regresses from 23716.451989299432 to 23052.03512506016 in microbenchmark.json REGRESSION 2.71%: single_client_put_gigabytes (THROUGHPUT) regresses from 20.105537951105227 to 19.561225172916046 in microbenchmark.json REGRESSION 2.69%: pgs_per_second (THROUGHPUT) regresses from 13.650631601393242 to 13.282795863244178 in benchmarks/many_pgs.json REGRESSION 1.35%: single_client_tasks_async (THROUGHPUT) regresses from 8081.168521067462 to 7971.849053459262 in microbenchmark.json REGRESSION 1.31%: n_n_actor_calls_async (THROUGHPUT) regresses from 27465.39608393524 to 27105.63998087682 in microbenchmark.json REGRESSION 1.09%: client__tasks_and_put_batch (THROUGHPUT) regresses from 14569.862277318796 to 14411.155262801181 in microbenchmark.json REGRESSION 1.05%: 1_1_async_actor_calls_sync (THROUGHPUT) regresses from 1483.660979687764 to 1468.0999827232097 in microbenchmark.json REGRESSION 0.92%: single_client_get_object_containing_10k_refs (THROUGHPUT) regresses from 12.796724102063072 to 12.67868528378648 in microbenchmark.json REGRESSION 0.88%: placement_group_create/removal (THROUGHPUT) regresses from 768.9082534403586 to 762.110356621388 in microbenchmark.json REGRESSION 0.87%: single_client_tasks_sync (THROUGHPUT) regresses from 969.5757440611114 to 961.1131766783709 in microbenchmark.json REGRESSION 0.35%: client__1_1_actor_calls_async (THROUGHPUT) regresses from 1069.1602586173547 to 1065.4228066614364 in microbenchmark.json REGRESSION 0.23%: client__put_gigabytes (THROUGHPUT) regresses from 0.1529268174148042 to 0.1525808986433169 in microbenchmark.json REGRESSION 0.05%: single_client_put_calls_Plasma_Store (THROUGHPUT) regresses from 5113.112753017668 to 5110.344528620948 in microbenchmark.json REGRESSION 49.81%: dashboard_p99_latency_ms (LATENCY) regresses from 275.082 to 412.087 in benchmarks/many_pgs.json REGRESSION 37.19%: dashboard_p95_latency_ms (LATENCY) regresses from 6.696 to 9.186 in benchmarks/many_pgs.json REGRESSION 36.35%: dashboard_p95_latency_ms (LATENCY) regresses from 2283.949 to 3114.217 in benchmarks/many_actors.json REGRESSION 13.04%: dashboard_p99_latency_ms (LATENCY) regresses from 675.061 to 763.093 in benchmarks/many_tasks.json REGRESSION 11.46%: dashboard_p50_latency_ms (LATENCY) regresses from 3.856 to 4.298 in benchmarks/many_pgs.json REGRESSION 11.23%: dashboard_p95_latency_ms (LATENCY) regresses from 437.195 to 486.283 in benchmarks/many_tasks.json REGRESSION 8.97%: 107374182400_large_object_time (LATENCY) regresses from 29.323037406000026 to 31.951921509999977 in scalability/single_node.json REGRESSION 6.24%: avg_iteration_time (LATENCY) regresses from 1.1950538015365602 to 1.2696449542045594 in stress_tests/stress_test_dead_actors.json REGRESSION 5.86%: dashboard_p50_latency_ms (LATENCY) regresses from 8.293 to 8.779 in benchmarks/many_actors.json REGRESSION 2.91%: time_to_broadcast_1073741824_bytes_to_50_nodes (LATENCY) regresses from 12.241764013000008 to 12.597426240999994 in scalability/object_store.json REGRESSION 1.02%: avg_pg_remove_time_ms (LATENCY) regresses from 1.2291068678679091 to 1.2416502777781075 in stress_tests/stress_test_placement_group.json REGRESSION 0.57%: dashboard_p50_latency_ms (LATENCY) regresses from 5.658 to 5.69 in benchmarks/many_nodes.json REGRESSION 0.34%: 10000_args_time (LATENCY) regresses from 18.764070391999994 to 18.828636121000002 in scalability/single_node.json ``` Signed-off-by: Lonnie Liu <lonnie@anyscale.com> Co-authored-by: Lonnie Liu <lonnie@anyscale.com>
pull bot
pushed a commit
that referenced
this pull request
Nov 18, 2025
…BRT: "corrupted size vs. prev_size") (ray-project#58660) ## Summary This PR fixes a heap corruption bug that causes the driver to crash with SIGABRT. The issue is caused by a use-after-free when the `RayletClient` object is destroyed while an asynchronous RPC callback is still pending. ## Problem Description ### Scenario A Ray Data job (Ray 2.50.0) with pipeline `read_parquet -> filter -> map_batches -> write` running for 4+ hours, where workers use elastic resources with low job priority causing frequent worker deaths due to pod preemption, crashes the driver with SIGABRT: ``` corrupted size vs. prev_size *** SIGABRT received at time=1761916578 on cpu 30 *** PC: @ 0x7f073569d9fc (unknown) pthread_kill Aborted (core dumped) ``` ### Trigger Conditions After reproducing with an ASan image, Asan reveals the actual use-after-free at: ``` #0 0x7ff282967361 in std::__atomic_base<long>::fetch_sub(long, std::memory_order) /usr/include/c++/11/bits/atomic_base.h:628 #1 0x7ff282967361 in std::__atomic_base<long>::operator--(int) /usr/include/c++/11/bits/atomic_base.h:377 #2 0x7ff282967361 in operator() src/ray/raylet_rpc_client/raylet_client.cc:338 #3 0x7ff282967361 in __invoke_impl<void, ray::rpc::RayletClient::PinObjectIDs(const ray::rpc::Address&, const std::vector<ray::ObjectID>&, const ray::ObjectID&, ray::rpc::ClientCallback<ray::rpc::PinObjectIDsReply>&)::<lambda(ray::Status, ray::rpc::PinObjectIDsReply&&)>&, const ray::Status&, ray::rpc::PinObjectIDsReply> /usr/include/c++/11/bits/invoke.h:61 #4 0x7ff282967361 in __invoke_r<void, ray::rpc::RayletClient::PinObjectIDs(const ray::rpc::Address&, const std::vector<ray::ObjectID>&, const ray::ObjectID&, ray::rpc::ClientCallback<ray::rpc::PinObjectIDsReply>&)::<lambda(ray::Status, ray::rpc::PinObjectIDsReply&&)>&, const ray::Status&, ray::rpc::PinObjectIDsReply> /usr/include/c++/11/bits/invoke.h:111 #5 0x7ff282967361 in _M_invoke /usr/include/c++/11/bits/std_function.h:290 #6 0x7ff2829fbadf in std::function<void (ray::Status const&, ray::rpc::PinObjectIDsReply&&)>::operator()(ray::Status const&, ray::rpc::PinObjectIDsReply&&) const /usr/include/c++/11/bits/std_function.h:590 #7 0x7ff2829fbadf in ray::rpc::RetryableGrpcClient::RetryableGrpcRequest::Create<ray::rpc::NodeManagerService, ray::rpc::PinObjectIDsRequest, ray::rpc::PinObjectIDsReply>(std::weak_ptr<ray::rpc::RetryableGrpcClient>, std::unique_ptr<grpc::ClientAsyncResponseReader<ray::rpc::PinObjectIDsReply>, std::default_delete<grpc::ClientAsyncResponseReader<ray::rpc::PinObjectIDsReply> > > (ray::rpc::NodeManagerService::Stub::*)(grpc::ClientContext*, ray::rpc::PinObjectIDsRequest const&, grpc::CompletionQueue*), std::shared_ptr<ray::rpc::GrpcClient<ray::rpc::NodeManagerService> >, std::__cxx11::basic_string<char, std::char_traits<char>, std::allocator<char> >, ray::rpc::PinObjectIDsRequest, std::function<void (ray::Status const&, ray::rpc::PinObjectIDsReply&&)>, long)::{lambda(ray::Status const&)#2}::operator()(ray::Status const&) const bazel-out/k8-dbg/bin/src/ray/rpc/_virtual_includes/retryable_grpc_client/ray/rpc/retryable_grpc_client.h:293 #8 0x7ff2829fbadf in void std::__invoke_impl<void, ray::rpc::RetryableGrpcClient::RetryableGrpcRequest::Create<ray::rpc::NodeManagerService, ray::rpc::PinObjectIDsRequest, ray::rpc::PinObjectIDsReply>(std::weak_ptr<ray::rpc::RetryableGrpcClient>, std::unique_ptr<grpc::ClientAsyncResponseReader<ray::rpc::PinObjectIDsReply>, std::default_delete<grpc::ClientAsyncResponseReader<ray::rpc::PinObjectIDsReply> > > (ray::rpc::NodeManagerService::Stub::*)(grpc::ClientContext*, ray::rpc::PinObjectIDsRequest const&, grpc::CompletionQueue*), std::shared_ptr<ray::rpc::GrpcClient<ray::rpc::NodeManagerService> >, std::__cxx11::basic_string<char, std::char_traits<char>, std::allocator<char> >, ray::rpc::PinObjectIDsRequest, std::function<void (ray::Status const&, ray::rpc::PinObjectIDsReply&&)>, long)::{lambda(ray::Status const&)#2}&, ray::Status>(std::__invoke_other, ray::rpc::RetryableGrpcClient::RetryableGrpcRequest::Create<ray::rpc::NodeManagerService, ray::rpc::PinObjectIDsRequest, ray::rpc::PinObjectIDsReply>(std::weak_ptr<ray::rpc::RetryableGrpcClient>, std::unique_ptr<grpc::ClientAsyncResponseReader<ray::rpc::PinObjectIDsReply>, std::default_delete<grpc::ClientAsyncResponseReader<ray::rpc::PinObjectIDsReply> > > (ray::rpc::NodeManagerService::Stub::*)(grpc::ClientContext*, ray::rpc::PinObjectIDsRequest const&, grpc::CompletionQueue*), std::shared_ptr<ray::rpc::GrpcClient<ray::rpc::NodeManagerService> >, std::__cxx11::basic_string<char, std::char_traits<char>, std::allocator<char> >, ray::rpc::PinObjectIDsRequest, std::function<void (ray::Status const&, ray::rpc::PinObjectIDsReply&&)>, long)::{lambda(ray::Status const&)#2}&, ray::Status&&) /usr/include/c++/11/bits/invoke.h:61 #9 0x7ff2829fbadf in std::enable_if<is_invocable_r_v<void, ray::rpc::RetryableGrpcClient::RetryableGrpcRequest::Create<ray::rpc::NodeManagerService, ray::rpc::PinObjectIDsRequest, ray::rpc::PinObjectIDsReply>(std::weak_ptr<ray::rpc::RetryableGrpcClient>, std::unique_ptr<grpc::ClientAsyncResponseReader<ray::rpc::PinObjectIDsReply>, std::default_delete<grpc::ClientAsyncResponseReader<ray::rpc::PinObjectIDsReply> > > (ray::rpc::NodeManagerService::Stub::*)(grpc::ClientContext*, ray::rpc::PinObjectIDsRequest const&, grpc::CompletionQueue*), std::shared_ptr<ray::rpc::GrpcClient<ray::rpc::NodeManagerService> >, std::__cxx11::basic_string<char, std::char_traits<char>, std::allocator<char> >, ray::rpc::PinObjectIDsRequest, std::function<void (ray::Status const&, ray::rpc::PinObjectIDsReply&&)>, long)::{lambda(ray::Status const&)#2}&, ray::Status>, void>::type std::__invoke_r<void, ray::rpc::RetryableGrpcClient::RetryableGrpcRequest::Create<ray::rpc::NodeManagerService, ray::rpc::PinObjectIDsRequest, ray::rpc::PinObjectIDsReply>(std::weak_ptr<ray::rpc::RetryableGrpcClient>, std::unique_ptr<grpc::ClientAsyncResponseReader<ray::rpc::PinObjectIDsReply>, std::default_delete<grpc::ClientAsyncResponseReader<ray::rpc::PinObjectIDsReply> > > (ray::rpc::NodeManagerService::Stub::*)(grpc::ClientContext*, ray::rpc::PinObjectIDsRequest const&, grpc::CompletionQueue*), std::shared_ptr<ray::rpc::GrpcClient<ray::rpc::NodeManagerService> >, std::__cxx11::basic_string<char, std::char_traits<char>, std::allocator<char> >, ray::rpc::PinObjectIDsRequest, std::function<void (ray::Status const&, ray::rpc::PinObjectIDsReply&&)>, long)::{lambda(ray::Status const&)#2}&, ray::Status>(ray::rpc::RetryableGrpcClient::RetryableGrpcRequest::Create<ray::rpc::NodeManagerService, ray::rpc::PinObjectIDsRequest, ray::rpc::PinObjectIDsReply>(std::weak_ptr<ray::rpc::RetryableGrpcClient>, std::unique_ptr<grpc::ClientAsyncResponseReader<ray::rpc::PinObjectIDsReply>, std::default_delete<grpc::ClientAsyncResponseReader<ray::rpc::PinObjectIDsReply> > > (ray::rpc::NodeManagerService::Stub::*)(grpc::ClientContext*, ray::rpc::PinObjectIDsRequest const&, grpc::CompletionQueue*), std::shared_ptr<ray::rpc::GrpcClient<ray::rpc::NodeManagerService> >, std::__cxx11::basic_string<char, std::char_traits<char>, std::allocator<char> >, ray::rpc::PinObjectIDsRequest, std::function<void (ray::Status const&, ray::rpc::PinObjectIDsReply&&)>, long)::{lambda(ray::Status const&)#2}&, ray::Status&&) /usr/include/c++/11/bits/invoke.h:111 #10 0x7ff2829fbadf in std::_Function_handler<void (ray::Status), ray::rpc::RetryableGrpcClient::RetryableGrpcRequest::Create<ray::rpc::NodeManagerService, ray::rpc::PinObjectIDsRequest, ray::rpc::PinObjectIDsReply>(std::weak_ptr<ray::rpc::RetryableGrpcClient>, std::unique_ptr<grpc::ClientAsyncResponseReader<ray::rpc::PinObjectIDsReply>, std::default_delete<grpc::ClientAsyncResponseReader<ray::rpc::PinObjectIDsReply> > > (ray::rpc::NodeManagerService::Stub::*)(grpc::ClientContext*, ray::rpc::PinObjectIDsRequest const&, grpc::CompletionQueue*), std::shared_ptr<ray::rpc::GrpcClient<ray::rpc::NodeManagerService> >, std::__cxx11::basic_string<char, std::char_traits<char>, std::allocator<char> >, ray::rpc::PinObjectIDsRequest, std::function<void (ray::Status const&, ray::rpc::PinObjectIDsReply&&)>, long)::{lambda(ray::Status const&)#2}>::_M_invoke(std::_Any_data const&, ray::Status&&) /usr/include/c++/11/bits/std_function.h:290 #11 0x7ff2834657e9 in std::function<void (ray::Status)>::operator()(ray::Status) const /usr/include/c++/11/bits/std_function.h:590 #12 0x7ff2834657e9 in ray::rpc::RetryableGrpcClient::RetryableGrpcRequest::Fail(ray::Status const&) bazel-out/k8-dbg/bin/src/ray/rpc/_virtual_includes/retryable_grpc_client/ray/rpc/retryable_grpc_client.h:109 #13 0x7ff2834657e9 in operator() src/ray/rpc/retryable_grpc_client.cc:30 #14 0x7ff2834657e9 in __invoke_impl<void, ray::rpc::RetryableGrpcClient::~RetryableGrpcClient()::<lambda()>&> /usr/include/c++/11/bits/invoke.h:61 #15 0x7ff2834657e9 in __invoke_r<void, ray::rpc::RetryableGrpcClient::~RetryableGrpcClient()::<lambda()>&> /usr/include/c++/11/bits/invoke.h:111 #16 0x7ff2834657e9 in _M_invoke /usr/include/c++/11/bits/std_function.h:290 2025-11-14 16:15:05,608 INFO streaming_executor_state.py:511 -- Running activate tasks is {'MapBatches(QwenInfer)': ['MapBatches(QwenInfer)-79153', 'MapBatches(QwenInfer)-80170', 'MapBatches(QwenInfer)-80225', 'MapBatches(QwenInfer)-80299', 'MapBatches(QwenInfer)-82624'], 'MapBatches(drop_columns)->Write': ['MapBatches(drop_columns)->Write-25244', 'MapBatches(drop_columns)->Write-34438', 'MapBatches(drop_columns)->Write-34439', 'MapBatches(drop_columns)->Write-34440', 'MapBatches(drop_columns)->Write-34441']} #17 0x7ff2834e2407 in std::function<void ()>::operator()() const /usr/include/c++/11/bits/std_function.h:590 #18 0x7ff2834e2407 in EventTracker::RecordExecution(std::function<void ()> const&, std::shared_ptr<StatsHandle>) src/ray/common/event_stats.cc:112 #19 0x7ff2834bea54 in operator() src/ray/common/asio/instrumented_io_context.cc:110 #20 0x7ff2834bea54 in __invoke_impl<void, instrumented_io_context::post(std::function<void()>, std::string, int64_t)::<lambda()>&> /usr/include/c++/11/bits/invoke.h:61 #21 0x7ff2834bea54 in __invoke_r<void, instrumented_io_context::post(std::function<void()>, std::string, int64_t)::<lambda()>&> /usr/include/c++/11/bits/invoke.h:111 #22 0x7ff2834bea54 in _M_invoke /usr/include/c++/11/bits/std_function.h:290 #23 0x7ff28242fb5b in std::function<void ()>::operator()() const /usr/include/c++/11/bits/std_function.h:590 #24 0x7ff28242fb5b in boost::asio::detail::binder0<std::function<void ()> >::operator()() external/boost/boost/asio/detail/bind_handler.hpp:60 #25 0x7ff28242fb5b in void boost::asio::asio_handler_invoke<boost::asio::detail::binder0<std::function<void ()> > >(boost::asio::detail::binder0<std::function<void ()> >&, ...) external/boost/boost/asio/handler_invoke_hook.hpp:88 #26 0x7ff28242fb5b in void boost_asio_handler_invoke_helpers::invoke<boost::asio::detail::binder0<std::function<void ()> >, std::function<void ()> >(boost::asio::detail::binder0<std::function<void ()> >&, std::function<void ()>&) external/boost/boost/asio/detail/handler_invoke_helpers.hpp:54 #27 0x7ff28242fb5b in void boost::asio::detail::asio_handler_invoke<boost::asio::detail::binder0<std::function<void ()> >, std::function<void ()> >(boost::asio::detail::binder0<std::function<void ()> >&, boost::asio::detail::binder0<std::function<void ()> >*) external/boost/boost/asio/detail/bind_handler.hpp:111 #28 0x7ff28242fb5b in void boost_asio_handler_invoke_helpers::invoke<boost::asio::detail::binder0<std::function<void ()> >, boost::asio::detail::binder0<std::function<void ()> > >(boost::asio::detail::binder0<std::function<void ()> >&, boost::asio::detail::binder0<std::function<void ()> >&) external/boost/boost/asio/detail/handler_invoke_helpers.hpp:54 #29 0x7ff28242fb5b in boost::asio::detail::executor_op<boost::asio::detail::binder0<std::function<void ()> >, std::allocator<void>, boost::asio::detail::scheduler_operation>::do_complete(void*, boost::asio::detail::scheduler_operation*, boost::system::error_code const&, unsigned long) external/boost/boost/asio/detail/executor_op.hpp:70 #30 0x7ff2838607d6 in boost::asio::detail::scheduler_operation::complete(void*, boost::system::error_code const&, unsigned long) external/boost/boost/asio/detail/scheduler_operation.hpp:40 #31 0x7ff2838607d6 in boost::asio::detail::scheduler::do_run_one(boost::asio::detail::conditionally_enabled_mutex::scoped_lock&, boost::asio::detail::scheduler_thread_info&, boost::system::error_code const&) external/boost/boost/asio/detail/impl/scheduler.ipp:492 #32 0x7ff283892d35 in boost::asio::detail::scheduler::run(boost::system::error_code&) external/boost/boost/asio/detail/impl/scheduler.ipp:210 #33 0x7ff2838981e0 in boost::asio::io_context::run() external/boost/boost/asio/impl/io_context.ipp:63 2025-11-14 16:15:05,742 INFO streaming_executor_state.py:511 -- Running activate tasks is {'MapBatches(QwenInfer)': ['MapBatches(QwenInfer)-79153', 'MapBatches(QwenInfer)-80170', 'MapBatches(QwenInfer)-80225', 'MapBatches(QwenInfer)-80299', 'MapBatches(QwenInfer)-82624'], 'MapBatches(drop_columns)->Write': ['MapBatches(drop_columns)->Write-25244', 'MapBatches(drop_columns)->Write-34438', 'MapBatches(drop_columns)->Write-34439', 'MapBatches(drop_columns)->Write-34440', 'MapBatches(drop_columns)->Write-34441']} #34 0x7ff281e9d0aa in operator() src/ray/core_worker/core_worker_process.cc:193 #35 0x7ff281e9d247 in run external/boost/boost/thread/detail/thread.hpp:120 #36 0x7ff282503c47 in thread_proxy external/boost/libs/thread/src/pthread/thread.cpp:179 #37 0x7ff28b013ac2 in start_thread nptl/pthread_create.c:442 #38 0x7ff28b0a58bf (/lib/x86_64-linux-gnu/libc.so.6+0x1268bf) 0x50c003fd3d30 is located 112 bytes inside of 120-byte region [0x50c003fd3cc0,0x50c003fd3d38) freed by thread T68 here: 2025-11-14 16:15:05,876 INFO streaming_executor_state.py:511 -- Running activate tasks is {'MapBatches(QwenInfer)': ['MapBatches(QwenInfer)-79153', 'MapBatches(QwenInfer)-80170', 'MapBatches(QwenInfer)-80225', 'MapBatches(QwenInfer)-80299', 'MapBatches(QwenInfer)-82624'], 'MapBatches(drop_columns)->Write': ['MapBatches(drop_columns)->Write-25244', 'MapBatches(drop_columns)->Write-34438', 'MapBatches(drop_columns)->Write-34439', 'MapBatches(drop_columns)->Write-34440', 'MapBatches(drop_columns)->Write-34441']} #0 0x7ff28b39924f in operator delete(void*, unsigned long) ../../../../src/libsanitizer/asan/asan_new_delete.cpp:172 #1 0x7ff281eceb5f in __gnu_cxx::new_allocator<std::_Sp_counted_ptr_inplace<ray::rpc::RayletClient, std::allocator<ray::rpc::RayletClient>, (__gnu_cxx::_Lock_policy)2> >::deallocate(std::_Sp_counted_ptr_inplace<ray::rpc::RayletClient, std::allocator<ray::rpc::RayletClient>, (__gnu_cxx::_Lock_policy)2>*, unsigned long) /usr/include/c++/11/ext/new_allocator.h:145 #2 0x7ff281eceb5f in std::allocator_traits<std::allocator<std::_Sp_counted_ptr_inplace<ray::rpc::RayletClient, std::allocator<ray::rpc::RayletClient>, (__gnu_cxx::_Lock_policy)2> > >::deallocate(std::allocator<std::_Sp_counted_ptr_inplace<ray::rpc::RayletClient, std::allocator<ray::rpc::RayletClient>, (__gnu_cxx::_Lock_policy)2> >&, std::_Sp_counted_ptr_inplace<ray::rpc::RayletClient, std::allocator<ray::rpc::RayletClient>, (__gnu_cxx::_Lock_policy)2>*, unsigned long) /usr/include/c++/11/bits/alloc_traits.h:496 #3 0x7ff281eceb5f in std::__allocated_ptr<std::allocator<std::_Sp_counted_ptr_inplace<ray::rpc::RayletClient, std::allocator<ray::rpc::RayletClient>, (__gnu_cxx::_Lock_policy)2> > >::~__allocated_ptr() /usr/include/c++/11/bits/allocated_ptr.h:74 #4 0x7ff281eceb5f in std::_Sp_counted_ptr_inplace<ray::rpc::RayletClient, std::allocator<ray::rpc::RayletClient>, (__gnu_cxx::_Lock_policy)2>::_M_destroy() /usr/include/c++/11/bits/shared_ptr_base.h:538 #5 0x7ff282a73f0a in std::_Sp_counted_base<(__gnu_cxx::_Lock_policy)2>::_M_release() /usr/include/c++/11/bits/shared_ptr_base.h:184 #6 0x7ff282a73f0a in std::__shared_count<(__gnu_cxx::_Lock_policy)2>::~__shared_count() /usr/include/c++/11/bits/shared_ptr_base.h:705 #7 0x7ff282a73f0a in std::__shared_ptr<ray::RayletClientInterface, (__gnu_cxx::_Lock_policy)2>::~__shared_ptr() /usr/include/c++/11/bits/shared_ptr_base.h:1154 #8 0x7ff282a73f0a in std::shared_ptr<ray::RayletClientInterface>::~shared_ptr() /usr/include/c++/11/bits/shared_ptr.h:122 #9 0x7ff282a73f0a in std::pair<ray::NodeID const, std::shared_ptr<ray::RayletClientInterface> >::~pair() /usr/include/c++/11/bits/stl_pair.h:211 #10 0x7ff282a73f0a in void __gnu_cxx::new_allocator<std::pair<ray::NodeID const, std::shared_ptr<ray::RayletClientInterface> > >::destroy<std::pair<ray::NodeID const, std::shared_ptr<ray::RayletClientInterface> > >(std::pair<ray::NodeID const, std::shared_ptr<ray::RayletClientInterface> >*) /usr/include/c++/11/ext/new_allocator.h:168 #11 0x7ff282a73f0a in void std::allocator_traits<std::allocator<std::pair<ray::NodeID const, std::shared_ptr<ray::RayletClientInterface> > > >::destroy<std::pair<ray::NodeID const, std::shared_ptr<ray::RayletClientInterface> > >(std::allocator<std::pair<ray::NodeID const, std::shared_ptr<ray::RayletClientInterface> > >&, std::pair<ray::NodeID const, std::shared_ptr<ray::RayletClientInterface> >*) /usr/include/c++/11/bits/alloc_traits.h:535 #12 0x7ff282a73f0a in void absl::lts_20230802::container_internal::map_slot_policy<ray::NodeID, std::shared_ptr<ray::RayletClientInterface> >::destroy<std::allocator<std::pair<ray::NodeID const, std::shared_ptr<ray::RayletClientInterface> > > >(std::allocator<std::pair<ray::NodeID const, std::shared_ptr<ray::RayletClientInterface> > >*, absl::lts_20230802::container_internal::map_slot_type<ray::NodeID, std::shared_ptr<ray::RayletClientInterface> >*) external/com_google_absl/absl/container/internal/container_memory.h:421 #13 0x7ff282a73f0a in void absl::lts_20230802::container_internal::FlatHashMapPolicy<ray::NodeID, std::shared_ptr<ray::RayletClientInterface> >::destroy<std::allocator<std::pair<ray::NodeID const, std::shared_ptr<ray::RayletClientInterface> > > >(std::allocator<std::pair<ray::NodeID const, std::shared_ptr<ray::RayletClientInterface> > >*, absl::lts_20230802::container_internal::map_slot_type<ray::NodeID, std::shared_ptr<ray::RayletClientInterface> >*) external/com_google_absl/absl/container/flat_hash_map.h:578 #14 0x7ff282a73f0a in void absl::lts_20230802::container_internal::common_policy_traits<absl::lts_20230802::container_internal::FlatHashMapPolicy<ray::NodeID, std::shared_ptr<ray::RayletClientInterface> >, void>::destroy<std::allocator<std::pair<ray::NodeID const, std::shared_ptr<ray::RayletClientInterface> > > >(std::allocator<std::pair<ray::NodeID const, std::shared_ptr<ray::RayletClientInterface> > >*, absl::lts_20230802::container_internal::map_slot_type<ray::NodeID, std::shared_ptr<ray::RayletClientInterface> >*) external/com_google_absl/absl/container/internal/common_policy_traits.h:50 #15 0x7ff282a73f0a in absl::lts_20230802::container_internal::raw_hash_set<absl::lts_20230802::container_internal::FlatHashMapPolicy<ray::NodeID, std::shared_ptr<ray::RayletClientInterface> >, absl::lts_20230802::hash_internal::Hash<ray::NodeID>, std::equal_to<ray::NodeID>, std::allocator<std::pair<ray::NodeID const, std::shared_ptr<ray::RayletClientInterface> > > >::erase(absl::lts_20230802::container_internal::raw_hash_set<absl::lts_20230802::container_internal::FlatHashMapPolicy<ray::NodeID, std::shared_ptr<ray::RayletClientInterface> >, absl::lts_20230802::hash_internal::Hash<ray::NodeID>, std::equal_to<ray::NodeID>, std::allocator<std::pair<ray::NodeID const, std::shared_ptr<ray::RayletClientInterface> > > >::iterator) external/com_google_absl/absl/container/internal/raw_hash_set.h:2183 #16 0x7ff282a73f0a in ray::rpc::RayletClientPool::Disconnect(ray::NodeID) src/ray/raylet_rpc_client/raylet_client_pool.cc:114 #17 0x7ff282a7aa61 in operator() src/ray/raylet_rpc_client/raylet_client_pool.cc:69 #18 0x7ff282a7ac66 in __invoke_impl<void, ray::rpc::RayletClientPool::GetDefaultUnavailableTimeoutCallback(ray::gcs::GcsClient*, ray::rpc::RayletClientPool*, const ray::rpc::Address&)::<lambda()>&> /usr/include/c++/11/bits/invoke.h:61 #19 0x7ff282a7ac66 in __invoke_r<void, ray::rpc::RayletClientPool::GetDefaultUnavailableTimeoutCallback(ray::gcs::GcsClient*, ray::rpc::RayletClientPool*, const ray::rpc::Address&)::<lambda()>&> /usr/include/c++/11/bits/invoke.h:111 #20 0x7ff282a7ac66 in _M_invoke /usr/include/c++/11/bits/std_function.h:290 #21 0x7ff28346a1ac in std::function<void ()>::operator()() const /usr/include/c++/11/bits/std_function.h:590 #22 0x7ff28346a1ac in ray::rpc::RetryableGrpcClient::CheckChannelStatus(bool) src/ray/rpc/retryable_grpc_client.cc:85 #23 0x7ff28346c06a in operator() src/ray/rpc/retryable_grpc_client.cc:45 ``` This is a **non-deterministic race condition** that occurs under the following sequence: 1. Worker A's pod is preempted → Worker A dies 2. Objects on Worker A are lost 3. Objects are found on Worker B → `PinObjectIDs` RPC is initiated 4. Worker B dies or becomes unavailable → `CheckChannelStatus` detects this → `Disconnect` is called 5. The `RayletClient` corresponding to Worker B on the driver is destroyed 6. RPC callback executes and accesses the already-freed `RayletClient` → use-after-free triggers crash Whether the use-after-free occurs depends on the relative timing of steps 5 and 6. In scenarios with frequent pod preemptions, object recovery frequently triggers `PinObjectIDs`, making this race condition more likely to occur. ### Root Cause In `RayletClient::PinObjectIDs`, the RPC callback lambda directly captured the raw `this` pointer: ```cpp auto rpc_callback = [this, callback = std::move(callback)](...) { pins_in_flight_--; // Accessing member via 'this' pointer ... }; ``` If the `RayletClient` object is destroyed before the async RPC callback executes, the callback will access freed memory through the dangling `this` pointer, leading to heap corruption and SIGABRT with the error message "corrupted size vs. prev_size". ## Solution The fix ensures that the `RayletClient` object remains alive during the asynchronous callback execution by: 1. **Inheriting from `std::enable_shared_from_this<RayletClient>`**: The class already inherits from this base class (line 43 in `raylet_client.h`), which enables safe shared pointer management. 2. **Capturing `shared_from_this()` in the lambda**: Instead of capturing the raw `this` pointer, the callback now captures a `shared_ptr` to the object. The `shared_from_this()` is called before incrementing `pins_in_flight_` to ensure proper lifetime management: ```cpp // Capture shared_from_this() before incrementing to ensure object lifetime // is extended for the async callback, preventing use-after-free. auto self = shared_from_this(); pins_in_flight_++; auto rpc_callback = [self, callback = std::move(callback)]( Status status, rpc::PinObjectIDsReply &&reply) { self->pins_in_flight_--; callback(status, std::move(reply)); }; ``` This ensures that the `RayletClient` object's lifetime is extended until the callback completes, preventing the use-after-free bug. By capturing the shared pointer before incrementing the counter, we also ensure that if `shared_from_this()` were to fail (though it shouldn't in normal usage), we don't leave the counter in an inconsistent state. ## Code Changes - **File**: `src/ray/raylet_rpc_client/raylet_client.cc` - **Method**: `RayletClient::PinObjectIDs` - **Change**: Replace `this` capture with `shared_from_this()` capture in the RPC callback lambda Signed-off-by: dragongu <andrewgu@vip.qq.com> Co-authored-by: gulonglong <gulonglong@stepfun.com>
Future-Outlier
pushed a commit
that referenced
this pull request
Dec 7, 2025
…BRT: "corrupted size vs. prev_size") (ray-project#58660) ## Summary This PR fixes a heap corruption bug that causes the driver to crash with SIGABRT. The issue is caused by a use-after-free when the `RayletClient` object is destroyed while an asynchronous RPC callback is still pending. ## Problem Description ### Scenario A Ray Data job (Ray 2.50.0) with pipeline `read_parquet -> filter -> map_batches -> write` running for 4+ hours, where workers use elastic resources with low job priority causing frequent worker deaths due to pod preemption, crashes the driver with SIGABRT: ``` corrupted size vs. prev_size *** SIGABRT received at time=1761916578 on cpu 30 *** PC: @ 0x7f073569d9fc (unknown) pthread_kill Aborted (core dumped) ``` ### Trigger Conditions After reproducing with an ASan image, Asan reveals the actual use-after-free at: ``` #0 0x7ff282967361 in std::__atomic_base<long>::fetch_sub(long, std::memory_order) /usr/include/c++/11/bits/atomic_base.h:628 #1 0x7ff282967361 in std::__atomic_base<long>::operator--(int) /usr/include/c++/11/bits/atomic_base.h:377 #2 0x7ff282967361 in operator() src/ray/raylet_rpc_client/raylet_client.cc:338 #3 0x7ff282967361 in __invoke_impl<void, ray::rpc::RayletClient::PinObjectIDs(const ray::rpc::Address&, const std::vector<ray::ObjectID>&, const ray::ObjectID&, ray::rpc::ClientCallback<ray::rpc::PinObjectIDsReply>&)::<lambda(ray::Status, ray::rpc::PinObjectIDsReply&&)>&, const ray::Status&, ray::rpc::PinObjectIDsReply> /usr/include/c++/11/bits/invoke.h:61 #4 0x7ff282967361 in __invoke_r<void, ray::rpc::RayletClient::PinObjectIDs(const ray::rpc::Address&, const std::vector<ray::ObjectID>&, const ray::ObjectID&, ray::rpc::ClientCallback<ray::rpc::PinObjectIDsReply>&)::<lambda(ray::Status, ray::rpc::PinObjectIDsReply&&)>&, const ray::Status&, ray::rpc::PinObjectIDsReply> /usr/include/c++/11/bits/invoke.h:111 #5 0x7ff282967361 in _M_invoke /usr/include/c++/11/bits/std_function.h:290 #6 0x7ff2829fbadf in std::function<void (ray::Status const&, ray::rpc::PinObjectIDsReply&&)>::operator()(ray::Status const&, ray::rpc::PinObjectIDsReply&&) const /usr/include/c++/11/bits/std_function.h:590 #7 0x7ff2829fbadf in ray::rpc::RetryableGrpcClient::RetryableGrpcRequest::Create<ray::rpc::NodeManagerService, ray::rpc::PinObjectIDsRequest, ray::rpc::PinObjectIDsReply>(std::weak_ptr<ray::rpc::RetryableGrpcClient>, std::unique_ptr<grpc::ClientAsyncResponseReader<ray::rpc::PinObjectIDsReply>, std::default_delete<grpc::ClientAsyncResponseReader<ray::rpc::PinObjectIDsReply> > > (ray::rpc::NodeManagerService::Stub::*)(grpc::ClientContext*, ray::rpc::PinObjectIDsRequest const&, grpc::CompletionQueue*), std::shared_ptr<ray::rpc::GrpcClient<ray::rpc::NodeManagerService> >, std::__cxx11::basic_string<char, std::char_traits<char>, std::allocator<char> >, ray::rpc::PinObjectIDsRequest, std::function<void (ray::Status const&, ray::rpc::PinObjectIDsReply&&)>, long)::{lambda(ray::Status const&)#2}::operator()(ray::Status const&) const bazel-out/k8-dbg/bin/src/ray/rpc/_virtual_includes/retryable_grpc_client/ray/rpc/retryable_grpc_client.h:293 #8 0x7ff2829fbadf in void std::__invoke_impl<void, ray::rpc::RetryableGrpcClient::RetryableGrpcRequest::Create<ray::rpc::NodeManagerService, ray::rpc::PinObjectIDsRequest, ray::rpc::PinObjectIDsReply>(std::weak_ptr<ray::rpc::RetryableGrpcClient>, std::unique_ptr<grpc::ClientAsyncResponseReader<ray::rpc::PinObjectIDsReply>, std::default_delete<grpc::ClientAsyncResponseReader<ray::rpc::PinObjectIDsReply> > > (ray::rpc::NodeManagerService::Stub::*)(grpc::ClientContext*, ray::rpc::PinObjectIDsRequest const&, grpc::CompletionQueue*), std::shared_ptr<ray::rpc::GrpcClient<ray::rpc::NodeManagerService> >, std::__cxx11::basic_string<char, std::char_traits<char>, std::allocator<char> >, ray::rpc::PinObjectIDsRequest, std::function<void (ray::Status const&, ray::rpc::PinObjectIDsReply&&)>, long)::{lambda(ray::Status const&)#2}&, ray::Status>(std::__invoke_other, ray::rpc::RetryableGrpcClient::RetryableGrpcRequest::Create<ray::rpc::NodeManagerService, ray::rpc::PinObjectIDsRequest, ray::rpc::PinObjectIDsReply>(std::weak_ptr<ray::rpc::RetryableGrpcClient>, std::unique_ptr<grpc::ClientAsyncResponseReader<ray::rpc::PinObjectIDsReply>, std::default_delete<grpc::ClientAsyncResponseReader<ray::rpc::PinObjectIDsReply> > > (ray::rpc::NodeManagerService::Stub::*)(grpc::ClientContext*, ray::rpc::PinObjectIDsRequest const&, grpc::CompletionQueue*), std::shared_ptr<ray::rpc::GrpcClient<ray::rpc::NodeManagerService> >, std::__cxx11::basic_string<char, std::char_traits<char>, std::allocator<char> >, ray::rpc::PinObjectIDsRequest, std::function<void (ray::Status const&, ray::rpc::PinObjectIDsReply&&)>, long)::{lambda(ray::Status const&)#2}&, ray::Status&&) /usr/include/c++/11/bits/invoke.h:61 #9 0x7ff2829fbadf in std::enable_if<is_invocable_r_v<void, ray::rpc::RetryableGrpcClient::RetryableGrpcRequest::Create<ray::rpc::NodeManagerService, ray::rpc::PinObjectIDsRequest, ray::rpc::PinObjectIDsReply>(std::weak_ptr<ray::rpc::RetryableGrpcClient>, std::unique_ptr<grpc::ClientAsyncResponseReader<ray::rpc::PinObjectIDsReply>, std::default_delete<grpc::ClientAsyncResponseReader<ray::rpc::PinObjectIDsReply> > > (ray::rpc::NodeManagerService::Stub::*)(grpc::ClientContext*, ray::rpc::PinObjectIDsRequest const&, grpc::CompletionQueue*), std::shared_ptr<ray::rpc::GrpcClient<ray::rpc::NodeManagerService> >, std::__cxx11::basic_string<char, std::char_traits<char>, std::allocator<char> >, ray::rpc::PinObjectIDsRequest, std::function<void (ray::Status const&, ray::rpc::PinObjectIDsReply&&)>, long)::{lambda(ray::Status const&)#2}&, ray::Status>, void>::type std::__invoke_r<void, ray::rpc::RetryableGrpcClient::RetryableGrpcRequest::Create<ray::rpc::NodeManagerService, ray::rpc::PinObjectIDsRequest, ray::rpc::PinObjectIDsReply>(std::weak_ptr<ray::rpc::RetryableGrpcClient>, std::unique_ptr<grpc::ClientAsyncResponseReader<ray::rpc::PinObjectIDsReply>, std::default_delete<grpc::ClientAsyncResponseReader<ray::rpc::PinObjectIDsReply> > > (ray::rpc::NodeManagerService::Stub::*)(grpc::ClientContext*, ray::rpc::PinObjectIDsRequest const&, grpc::CompletionQueue*), std::shared_ptr<ray::rpc::GrpcClient<ray::rpc::NodeManagerService> >, std::__cxx11::basic_string<char, std::char_traits<char>, std::allocator<char> >, ray::rpc::PinObjectIDsRequest, std::function<void (ray::Status const&, ray::rpc::PinObjectIDsReply&&)>, long)::{lambda(ray::Status const&)#2}&, ray::Status>(ray::rpc::RetryableGrpcClient::RetryableGrpcRequest::Create<ray::rpc::NodeManagerService, ray::rpc::PinObjectIDsRequest, ray::rpc::PinObjectIDsReply>(std::weak_ptr<ray::rpc::RetryableGrpcClient>, std::unique_ptr<grpc::ClientAsyncResponseReader<ray::rpc::PinObjectIDsReply>, std::default_delete<grpc::ClientAsyncResponseReader<ray::rpc::PinObjectIDsReply> > > (ray::rpc::NodeManagerService::Stub::*)(grpc::ClientContext*, ray::rpc::PinObjectIDsRequest const&, grpc::CompletionQueue*), std::shared_ptr<ray::rpc::GrpcClient<ray::rpc::NodeManagerService> >, std::__cxx11::basic_string<char, std::char_traits<char>, std::allocator<char> >, ray::rpc::PinObjectIDsRequest, std::function<void (ray::Status const&, ray::rpc::PinObjectIDsReply&&)>, long)::{lambda(ray::Status const&)#2}&, ray::Status&&) /usr/include/c++/11/bits/invoke.h:111 #10 0x7ff2829fbadf in std::_Function_handler<void (ray::Status), ray::rpc::RetryableGrpcClient::RetryableGrpcRequest::Create<ray::rpc::NodeManagerService, ray::rpc::PinObjectIDsRequest, ray::rpc::PinObjectIDsReply>(std::weak_ptr<ray::rpc::RetryableGrpcClient>, std::unique_ptr<grpc::ClientAsyncResponseReader<ray::rpc::PinObjectIDsReply>, std::default_delete<grpc::ClientAsyncResponseReader<ray::rpc::PinObjectIDsReply> > > (ray::rpc::NodeManagerService::Stub::*)(grpc::ClientContext*, ray::rpc::PinObjectIDsRequest const&, grpc::CompletionQueue*), std::shared_ptr<ray::rpc::GrpcClient<ray::rpc::NodeManagerService> >, std::__cxx11::basic_string<char, std::char_traits<char>, std::allocator<char> >, ray::rpc::PinObjectIDsRequest, std::function<void (ray::Status const&, ray::rpc::PinObjectIDsReply&&)>, long)::{lambda(ray::Status const&)#2}>::_M_invoke(std::_Any_data const&, ray::Status&&) /usr/include/c++/11/bits/std_function.h:290 #11 0x7ff2834657e9 in std::function<void (ray::Status)>::operator()(ray::Status) const /usr/include/c++/11/bits/std_function.h:590 #12 0x7ff2834657e9 in ray::rpc::RetryableGrpcClient::RetryableGrpcRequest::Fail(ray::Status const&) bazel-out/k8-dbg/bin/src/ray/rpc/_virtual_includes/retryable_grpc_client/ray/rpc/retryable_grpc_client.h:109 #13 0x7ff2834657e9 in operator() src/ray/rpc/retryable_grpc_client.cc:30 #14 0x7ff2834657e9 in __invoke_impl<void, ray::rpc::RetryableGrpcClient::~RetryableGrpcClient()::<lambda()>&> /usr/include/c++/11/bits/invoke.h:61 #15 0x7ff2834657e9 in __invoke_r<void, ray::rpc::RetryableGrpcClient::~RetryableGrpcClient()::<lambda()>&> /usr/include/c++/11/bits/invoke.h:111 #16 0x7ff2834657e9 in _M_invoke /usr/include/c++/11/bits/std_function.h:290 2025-11-14 16:15:05,608 INFO streaming_executor_state.py:511 -- Running activate tasks is {'MapBatches(QwenInfer)': ['MapBatches(QwenInfer)-79153', 'MapBatches(QwenInfer)-80170', 'MapBatches(QwenInfer)-80225', 'MapBatches(QwenInfer)-80299', 'MapBatches(QwenInfer)-82624'], 'MapBatches(drop_columns)->Write': ['MapBatches(drop_columns)->Write-25244', 'MapBatches(drop_columns)->Write-34438', 'MapBatches(drop_columns)->Write-34439', 'MapBatches(drop_columns)->Write-34440', 'MapBatches(drop_columns)->Write-34441']} #17 0x7ff2834e2407 in std::function<void ()>::operator()() const /usr/include/c++/11/bits/std_function.h:590 #18 0x7ff2834e2407 in EventTracker::RecordExecution(std::function<void ()> const&, std::shared_ptr<StatsHandle>) src/ray/common/event_stats.cc:112 #19 0x7ff2834bea54 in operator() src/ray/common/asio/instrumented_io_context.cc:110 #20 0x7ff2834bea54 in __invoke_impl<void, instrumented_io_context::post(std::function<void()>, std::string, int64_t)::<lambda()>&> /usr/include/c++/11/bits/invoke.h:61 #21 0x7ff2834bea54 in __invoke_r<void, instrumented_io_context::post(std::function<void()>, std::string, int64_t)::<lambda()>&> /usr/include/c++/11/bits/invoke.h:111 #22 0x7ff2834bea54 in _M_invoke /usr/include/c++/11/bits/std_function.h:290 #23 0x7ff28242fb5b in std::function<void ()>::operator()() const /usr/include/c++/11/bits/std_function.h:590 #24 0x7ff28242fb5b in boost::asio::detail::binder0<std::function<void ()> >::operator()() external/boost/boost/asio/detail/bind_handler.hpp:60 #25 0x7ff28242fb5b in void boost::asio::asio_handler_invoke<boost::asio::detail::binder0<std::function<void ()> > >(boost::asio::detail::binder0<std::function<void ()> >&, ...) external/boost/boost/asio/handler_invoke_hook.hpp:88 #26 0x7ff28242fb5b in void boost_asio_handler_invoke_helpers::invoke<boost::asio::detail::binder0<std::function<void ()> >, std::function<void ()> >(boost::asio::detail::binder0<std::function<void ()> >&, std::function<void ()>&) external/boost/boost/asio/detail/handler_invoke_helpers.hpp:54 #27 0x7ff28242fb5b in void boost::asio::detail::asio_handler_invoke<boost::asio::detail::binder0<std::function<void ()> >, std::function<void ()> >(boost::asio::detail::binder0<std::function<void ()> >&, boost::asio::detail::binder0<std::function<void ()> >*) external/boost/boost/asio/detail/bind_handler.hpp:111 #28 0x7ff28242fb5b in void boost_asio_handler_invoke_helpers::invoke<boost::asio::detail::binder0<std::function<void ()> >, boost::asio::detail::binder0<std::function<void ()> > >(boost::asio::detail::binder0<std::function<void ()> >&, boost::asio::detail::binder0<std::function<void ()> >&) external/boost/boost/asio/detail/handler_invoke_helpers.hpp:54 #29 0x7ff28242fb5b in boost::asio::detail::executor_op<boost::asio::detail::binder0<std::function<void ()> >, std::allocator<void>, boost::asio::detail::scheduler_operation>::do_complete(void*, boost::asio::detail::scheduler_operation*, boost::system::error_code const&, unsigned long) external/boost/boost/asio/detail/executor_op.hpp:70 #30 0x7ff2838607d6 in boost::asio::detail::scheduler_operation::complete(void*, boost::system::error_code const&, unsigned long) external/boost/boost/asio/detail/scheduler_operation.hpp:40 #31 0x7ff2838607d6 in boost::asio::detail::scheduler::do_run_one(boost::asio::detail::conditionally_enabled_mutex::scoped_lock&, boost::asio::detail::scheduler_thread_info&, boost::system::error_code const&) external/boost/boost/asio/detail/impl/scheduler.ipp:492 #32 0x7ff283892d35 in boost::asio::detail::scheduler::run(boost::system::error_code&) external/boost/boost/asio/detail/impl/scheduler.ipp:210 #33 0x7ff2838981e0 in boost::asio::io_context::run() external/boost/boost/asio/impl/io_context.ipp:63 2025-11-14 16:15:05,742 INFO streaming_executor_state.py:511 -- Running activate tasks is {'MapBatches(QwenInfer)': ['MapBatches(QwenInfer)-79153', 'MapBatches(QwenInfer)-80170', 'MapBatches(QwenInfer)-80225', 'MapBatches(QwenInfer)-80299', 'MapBatches(QwenInfer)-82624'], 'MapBatches(drop_columns)->Write': ['MapBatches(drop_columns)->Write-25244', 'MapBatches(drop_columns)->Write-34438', 'MapBatches(drop_columns)->Write-34439', 'MapBatches(drop_columns)->Write-34440', 'MapBatches(drop_columns)->Write-34441']} #34 0x7ff281e9d0aa in operator() src/ray/core_worker/core_worker_process.cc:193 #35 0x7ff281e9d247 in run external/boost/boost/thread/detail/thread.hpp:120 #36 0x7ff282503c47 in thread_proxy external/boost/libs/thread/src/pthread/thread.cpp:179 #37 0x7ff28b013ac2 in start_thread nptl/pthread_create.c:442 #38 0x7ff28b0a58bf (/lib/x86_64-linux-gnu/libc.so.6+0x1268bf) 0x50c003fd3d30 is located 112 bytes inside of 120-byte region [0x50c003fd3cc0,0x50c003fd3d38) freed by thread T68 here: 2025-11-14 16:15:05,876 INFO streaming_executor_state.py:511 -- Running activate tasks is {'MapBatches(QwenInfer)': ['MapBatches(QwenInfer)-79153', 'MapBatches(QwenInfer)-80170', 'MapBatches(QwenInfer)-80225', 'MapBatches(QwenInfer)-80299', 'MapBatches(QwenInfer)-82624'], 'MapBatches(drop_columns)->Write': ['MapBatches(drop_columns)->Write-25244', 'MapBatches(drop_columns)->Write-34438', 'MapBatches(drop_columns)->Write-34439', 'MapBatches(drop_columns)->Write-34440', 'MapBatches(drop_columns)->Write-34441']} #0 0x7ff28b39924f in operator delete(void*, unsigned long) ../../../../src/libsanitizer/asan/asan_new_delete.cpp:172 #1 0x7ff281eceb5f in __gnu_cxx::new_allocator<std::_Sp_counted_ptr_inplace<ray::rpc::RayletClient, std::allocator<ray::rpc::RayletClient>, (__gnu_cxx::_Lock_policy)2> >::deallocate(std::_Sp_counted_ptr_inplace<ray::rpc::RayletClient, std::allocator<ray::rpc::RayletClient>, (__gnu_cxx::_Lock_policy)2>*, unsigned long) /usr/include/c++/11/ext/new_allocator.h:145 #2 0x7ff281eceb5f in std::allocator_traits<std::allocator<std::_Sp_counted_ptr_inplace<ray::rpc::RayletClient, std::allocator<ray::rpc::RayletClient>, (__gnu_cxx::_Lock_policy)2> > >::deallocate(std::allocator<std::_Sp_counted_ptr_inplace<ray::rpc::RayletClient, std::allocator<ray::rpc::RayletClient>, (__gnu_cxx::_Lock_policy)2> >&, std::_Sp_counted_ptr_inplace<ray::rpc::RayletClient, std::allocator<ray::rpc::RayletClient>, (__gnu_cxx::_Lock_policy)2>*, unsigned long) /usr/include/c++/11/bits/alloc_traits.h:496 #3 0x7ff281eceb5f in std::__allocated_ptr<std::allocator<std::_Sp_counted_ptr_inplace<ray::rpc::RayletClient, std::allocator<ray::rpc::RayletClient>, (__gnu_cxx::_Lock_policy)2> > >::~__allocated_ptr() /usr/include/c++/11/bits/allocated_ptr.h:74 #4 0x7ff281eceb5f in std::_Sp_counted_ptr_inplace<ray::rpc::RayletClient, std::allocator<ray::rpc::RayletClient>, (__gnu_cxx::_Lock_policy)2>::_M_destroy() /usr/include/c++/11/bits/shared_ptr_base.h:538 #5 0x7ff282a73f0a in std::_Sp_counted_base<(__gnu_cxx::_Lock_policy)2>::_M_release() /usr/include/c++/11/bits/shared_ptr_base.h:184 #6 0x7ff282a73f0a in std::__shared_count<(__gnu_cxx::_Lock_policy)2>::~__shared_count() /usr/include/c++/11/bits/shared_ptr_base.h:705 #7 0x7ff282a73f0a in std::__shared_ptr<ray::RayletClientInterface, (__gnu_cxx::_Lock_policy)2>::~__shared_ptr() /usr/include/c++/11/bits/shared_ptr_base.h:1154 #8 0x7ff282a73f0a in std::shared_ptr<ray::RayletClientInterface>::~shared_ptr() /usr/include/c++/11/bits/shared_ptr.h:122 #9 0x7ff282a73f0a in std::pair<ray::NodeID const, std::shared_ptr<ray::RayletClientInterface> >::~pair() /usr/include/c++/11/bits/stl_pair.h:211 #10 0x7ff282a73f0a in void __gnu_cxx::new_allocator<std::pair<ray::NodeID const, std::shared_ptr<ray::RayletClientInterface> > >::destroy<std::pair<ray::NodeID const, std::shared_ptr<ray::RayletClientInterface> > >(std::pair<ray::NodeID const, std::shared_ptr<ray::RayletClientInterface> >*) /usr/include/c++/11/ext/new_allocator.h:168 #11 0x7ff282a73f0a in void std::allocator_traits<std::allocator<std::pair<ray::NodeID const, std::shared_ptr<ray::RayletClientInterface> > > >::destroy<std::pair<ray::NodeID const, std::shared_ptr<ray::RayletClientInterface> > >(std::allocator<std::pair<ray::NodeID const, std::shared_ptr<ray::RayletClientInterface> > >&, std::pair<ray::NodeID const, std::shared_ptr<ray::RayletClientInterface> >*) /usr/include/c++/11/bits/alloc_traits.h:535 #12 0x7ff282a73f0a in void absl::lts_20230802::container_internal::map_slot_policy<ray::NodeID, std::shared_ptr<ray::RayletClientInterface> >::destroy<std::allocator<std::pair<ray::NodeID const, std::shared_ptr<ray::RayletClientInterface> > > >(std::allocator<std::pair<ray::NodeID const, std::shared_ptr<ray::RayletClientInterface> > >*, absl::lts_20230802::container_internal::map_slot_type<ray::NodeID, std::shared_ptr<ray::RayletClientInterface> >*) external/com_google_absl/absl/container/internal/container_memory.h:421 #13 0x7ff282a73f0a in void absl::lts_20230802::container_internal::FlatHashMapPolicy<ray::NodeID, std::shared_ptr<ray::RayletClientInterface> >::destroy<std::allocator<std::pair<ray::NodeID const, std::shared_ptr<ray::RayletClientInterface> > > >(std::allocator<std::pair<ray::NodeID const, std::shared_ptr<ray::RayletClientInterface> > >*, absl::lts_20230802::container_internal::map_slot_type<ray::NodeID, std::shared_ptr<ray::RayletClientInterface> >*) external/com_google_absl/absl/container/flat_hash_map.h:578 #14 0x7ff282a73f0a in void absl::lts_20230802::container_internal::common_policy_traits<absl::lts_20230802::container_internal::FlatHashMapPolicy<ray::NodeID, std::shared_ptr<ray::RayletClientInterface> >, void>::destroy<std::allocator<std::pair<ray::NodeID const, std::shared_ptr<ray::RayletClientInterface> > > >(std::allocator<std::pair<ray::NodeID const, std::shared_ptr<ray::RayletClientInterface> > >*, absl::lts_20230802::container_internal::map_slot_type<ray::NodeID, std::shared_ptr<ray::RayletClientInterface> >*) external/com_google_absl/absl/container/internal/common_policy_traits.h:50 #15 0x7ff282a73f0a in absl::lts_20230802::container_internal::raw_hash_set<absl::lts_20230802::container_internal::FlatHashMapPolicy<ray::NodeID, std::shared_ptr<ray::RayletClientInterface> >, absl::lts_20230802::hash_internal::Hash<ray::NodeID>, std::equal_to<ray::NodeID>, std::allocator<std::pair<ray::NodeID const, std::shared_ptr<ray::RayletClientInterface> > > >::erase(absl::lts_20230802::container_internal::raw_hash_set<absl::lts_20230802::container_internal::FlatHashMapPolicy<ray::NodeID, std::shared_ptr<ray::RayletClientInterface> >, absl::lts_20230802::hash_internal::Hash<ray::NodeID>, std::equal_to<ray::NodeID>, std::allocator<std::pair<ray::NodeID const, std::shared_ptr<ray::RayletClientInterface> > > >::iterator) external/com_google_absl/absl/container/internal/raw_hash_set.h:2183 #16 0x7ff282a73f0a in ray::rpc::RayletClientPool::Disconnect(ray::NodeID) src/ray/raylet_rpc_client/raylet_client_pool.cc:114 #17 0x7ff282a7aa61 in operator() src/ray/raylet_rpc_client/raylet_client_pool.cc:69 #18 0x7ff282a7ac66 in __invoke_impl<void, ray::rpc::RayletClientPool::GetDefaultUnavailableTimeoutCallback(ray::gcs::GcsClient*, ray::rpc::RayletClientPool*, const ray::rpc::Address&)::<lambda()>&> /usr/include/c++/11/bits/invoke.h:61 #19 0x7ff282a7ac66 in __invoke_r<void, ray::rpc::RayletClientPool::GetDefaultUnavailableTimeoutCallback(ray::gcs::GcsClient*, ray::rpc::RayletClientPool*, const ray::rpc::Address&)::<lambda()>&> /usr/include/c++/11/bits/invoke.h:111 #20 0x7ff282a7ac66 in _M_invoke /usr/include/c++/11/bits/std_function.h:290 #21 0x7ff28346a1ac in std::function<void ()>::operator()() const /usr/include/c++/11/bits/std_function.h:590 #22 0x7ff28346a1ac in ray::rpc::RetryableGrpcClient::CheckChannelStatus(bool) src/ray/rpc/retryable_grpc_client.cc:85 #23 0x7ff28346c06a in operator() src/ray/rpc/retryable_grpc_client.cc:45 ``` This is a **non-deterministic race condition** that occurs under the following sequence: 1. Worker A's pod is preempted → Worker A dies 2. Objects on Worker A are lost 3. Objects are found on Worker B → `PinObjectIDs` RPC is initiated 4. Worker B dies or becomes unavailable → `CheckChannelStatus` detects this → `Disconnect` is called 5. The `RayletClient` corresponding to Worker B on the driver is destroyed 6. RPC callback executes and accesses the already-freed `RayletClient` → use-after-free triggers crash Whether the use-after-free occurs depends on the relative timing of steps 5 and 6. In scenarios with frequent pod preemptions, object recovery frequently triggers `PinObjectIDs`, making this race condition more likely to occur. ### Root Cause In `RayletClient::PinObjectIDs`, the RPC callback lambda directly captured the raw `this` pointer: ```cpp auto rpc_callback = [this, callback = std::move(callback)](...) { pins_in_flight_--; // Accessing member via 'this' pointer ... }; ``` If the `RayletClient` object is destroyed before the async RPC callback executes, the callback will access freed memory through the dangling `this` pointer, leading to heap corruption and SIGABRT with the error message "corrupted size vs. prev_size". ## Solution The fix ensures that the `RayletClient` object remains alive during the asynchronous callback execution by: 1. **Inheriting from `std::enable_shared_from_this<RayletClient>`**: The class already inherits from this base class (line 43 in `raylet_client.h`), which enables safe shared pointer management. 2. **Capturing `shared_from_this()` in the lambda**: Instead of capturing the raw `this` pointer, the callback now captures a `shared_ptr` to the object. The `shared_from_this()` is called before incrementing `pins_in_flight_` to ensure proper lifetime management: ```cpp // Capture shared_from_this() before incrementing to ensure object lifetime // is extended for the async callback, preventing use-after-free. auto self = shared_from_this(); pins_in_flight_++; auto rpc_callback = [self, callback = std::move(callback)]( Status status, rpc::PinObjectIDsReply &&reply) { self->pins_in_flight_--; callback(status, std::move(reply)); }; ``` This ensures that the `RayletClient` object's lifetime is extended until the callback completes, preventing the use-after-free bug. By capturing the shared pointer before incrementing the counter, we also ensure that if `shared_from_this()` were to fail (though it shouldn't in normal usage), we don't leave the counter in an inconsistent state. ## Code Changes - **File**: `src/ray/raylet_rpc_client/raylet_client.cc` - **Method**: `RayletClient::PinObjectIDs` - **Change**: Replace `this` capture with `shared_from_this()` capture in the RPC callback lambda Signed-off-by: dragongu <andrewgu@vip.qq.com> Co-authored-by: gulonglong <gulonglong@stepfun.com> Signed-off-by: Future-Outlier <eric901201@gmail.com>
pull bot
pushed a commit
that referenced
this pull request
Feb 18, 2026
ray-project#61034) Currently, there is a chance that a worker can crash on the `getenv` syscall from the otel lazy initialization. We found the race is between `setenv` on the user thread (`setenv(RBLN_DEVICES)`) and `getenv` on the worker internal thread. However, we can't forbid `setenv` on a user's thread; the only thing we can do is not call `getenv` once the user's thread starts. Here is the backtrace of the crash we found by intercepting the `getenv`: ``` [getenv_preload] setenv name=RBLN_DEVICES value= overwrite=1 [getenv_preload] setenv backtrace: #0 /home/ray/getenv_trace_preload.so(setenv+0x73) [0x748a77ea870b] #1 ray::IDLE(+0x224d5b) [0x59f10aeead5b] #2 ray::IDLE(+0x13dfc3) [0x59f10ae03fc3] #3 ray::IDLE(_PyEval_EvalFrameDefault+0x313) [0x59f10adf3703] #4 ray::IDLE(+0x184bfd) [0x59f10ae4abfd] #5 ray::IDLE(+0x19da04) [0x59f10ae63a04] #6 ray::IDLE(_PyEval_EvalFrameDefault+0x115a) [0x59f10adf454a] #7 ray::IDLE(_PyFunction_Vectorcall+0x6c) [0x59f10ae03dfc] #8 ray::IDLE(_PyEval_EvalFrameDefault+0x49ae) [0x59f10adf7d9e] #9 ray::IDLE(_PyFunction_Vectorcall+0x6c) [0x59f10ae03dfc] #10 /home/ray/anaconda3/lib/python3.10/site-packages/ray/_raylet.so(+0x9a9333) [0x748a76270333] #11 /home/ray/anaconda3/lib/python3.10/site-packages/ray/_raylet.so(_ZNSt17_Function_handlerIFN3ray6StatusERKNS0_3rpc7AddressENS2_8TaskTypeESsRKNS0_4core11RayFunctionERKSt13unordered_mapISsdSt4hashISsESt8equal_toISsESaISt4pairIKSsdEEERKSt6vectorISt10shared_ptrINS0_9RayObjectEESaISQ_EERKSN_INS2_15ObjectReferenceESaISV_EERSH_S10_PSN_ISG_INS0_8ObjectIDESQ_ESaIS12_EES15_PSN_ISG_IS11_bESaIS16_EERSO_INS0_17LocalMemoryBufferEEPbPSsS1E_RKSN_INS0_16ConcurrencyGroupESaIS1F_EESsbbblRKSt8optionalISsEEPFS1_S5_S6_SsSA_SM_SU_SZ_SsSsS15_S15_S19_S1C_S1D_S1E_S1E_S1J_SsbbblS1L_EE9_M_invokeERKSt9_Any_dataS5_OS6_OSsSA_SM_SU_SZ_S10_S10_OS15_S1X_OS19_S1C_OS1D_OS1E_S20_S1J_S1W_ObS21_S21_OlS1N_+0x1ab) [0x748a761786ab] #12 /home/ray/anaconda3/lib/python3.10/site-packages/ray/_raylet.so(_ZN3ray4core10CoreWorker11ExecuteTaskERKNS_17TaskSpecificationESt8optionalISt13unordered_mapISsSt6vectorISt4pairIldESaIS9_EESt4hashISsESt8equal_toISsESaIS8_IKSsSB_EEEEPS7_IS8_INS_8ObjectIDESt10shared_ptrINS_9RayObjectEEESaISP_EESS_PS7_IS8_ISL_bESaIST_EEPN6google8protobuf16RepeatedPtrFieldINS_3rpc20ObjectReferenceCountEEEPbPSsS15_+0x1166) [0x748a76320a96] #13 /home/ray/anaconda3/lib/python3.10/site-packages/ray/_raylet.so(_ZNSt17_Function_handlerIFN3ray6StatusERKNS0_17TaskSpecificationESt8optionalISt13unordered_mapISsSt6vectorISt4pairIldESaIS9_EESt4hashISsESt8equal_toISsESaIS8_IKSsSB_EEEEPS7_IS8_INS0_8ObjectIDESt10shared_ptrINS0_9RayObjectEEESaISP_EESS_PS7_IS8_ISL_bESaIST_EEPN6google8protobuf16RepeatedPtrFieldINS0_3rpc20ObjectReferenceCountEEEPbPSsS15_ESt5_BindIFMNS0_4core10CoreWorkerEFS1_S4_SK_SS_SS_SW_S13_S14_S15_S15_EPS19_St12_PlaceholderILi1EES1D_ILi2EES1D_ILi3EES1D_ILi4EES1D_ILi5EES1D_ILi6EES1D_ILi7EES1D_ILi8EES1D_ILi9EEEEE9_M_invokeERKSt9_Any_dataS4_OSK_OSS_S1U_OSW_OS13_OS14_OS15_S1Y_+0x87) [0x748a762e8647] #14 /home/ray/anaconda3/lib/python3.10/site-packages/ray/_raylet.so(+0xb5186d) [0x748a7641886d] #15 /home/ray/anaconda3/lib/python3.10/site-packages/ray/_raylet.so(+0xb557c5) [0x748a7641c7c5] #16 /home/ray/anaconda3/lib/python3.10/site-packages/ray/_raylet.so(+0x103e3eb) [0x748a769053eb] #17 /home/ray/anaconda3/lib/python3.10/site-packages/ray/_raylet.so(+0x1034f0b) [0x748a768fbf0b] #18 /home/ray/anaconda3/lib/python3.10/site-packages/ray/_raylet.so(+0xb6f21b) [0x748a7643621b] #19 /home/ray/anaconda3/lib/python3.10/site-packages/ray/_raylet.so(+0x15893cb) [0x748a76e503cb] #20 /home/ray/anaconda3/lib/python3.10/site-packages/ray/_raylet.so(+0x158ad69) [0x748a76e51d69] #21 /home/ray/anaconda3/lib/python3.10/site-packages/ray/_raylet.so(+0x158b472) [0x748a76e52472] #22 /home/ray/anaconda3/lib/python3.10/site-packages/ray/_raylet.so(_ZN3ray4core10CoreWorker20RunTaskExecutionLoopEv+0x132) [0x748a762e4252] #23 /home/ray/anaconda3/lib/python3.10/site-packages/ray/_raylet.so(_ZN3ray4core21CoreWorkerProcessImpl26RunWorkerTaskExecutionLoopEv+0x41) [0x748a76336bd1] #24 /home/ray/anaconda3/lib/python3.10/site-packages/ray/_raylet.so(+0x8a45c1) [0x748a7616b5c1] #25 ray::IDLE(_PyEval_EvalFrameDefault+0x6fb) [0x59f10adf3aeb] #26 ray::IDLE(_PyFunction_Vectorcall+0x6c) [0x59f10ae03dfc] #27 ray::IDLE(_PyEval_EvalFrameDefault+0x6fb) [0x59f10adf3aeb] #28 ray::IDLE(+0x1d5cac) [0x59f10ae9bcac] #29 ray::IDLE(PyEval_EvalCode+0x85) [0x59f10ae9bbf5] #30 ray::IDLE(+0x20732a) [0x59f10aecd32a] #31 ray::IDLE(+0x201d13) [0x59f10aec7d13] #32 ray::IDLE(+0x976be) [0x59f10ad5d6be] #33 ray::IDLE(_PyRun_SimpleFileObject+0x1bb) [0x59f10aec23db] #34 ray::IDLE(_PyRun_AnyFileObject+0x44) [0x59f10aec1f74] #35 ray::IDLE(Py_RunMain+0x371) [0x59f10aebf3e1] #36 ray::IDLE(Py_BytesMain+0x37) [0x59f10ae8f447] #37 /lib/x86_64-linux-gnu/libc.so.6(+0x29d90) [0x748a77baad90] #38 /lib/x86_64-linux-gnu/libc.so.6(__libc_start_main+0x80) [0x748a77baae40] #39 ray::IDLE(+0x1c930e) [0x59f10ae8f30e] [getenv_preload] getenv name=OTEL_CPP_EXPORTER_OTLP_METRICS_RETRY_BACKOFF_MULTIPLIER [getenv_preload] backtrace: #0 /home/ray/anaconda3/lib/python3.10/site-packages/ray/_raylet.so(+0x10a9d17) [0x7321ce3c9d17] #1 /home/ray/anaconda3/lib/python3.10/site-packages/ray/_raylet.so(+0x10abe2b) [0x7321ce3cbe2b] #2 /home/ray/anaconda3/lib/python3.10/site-packages/ray/_raylet.so(+0x1050ffc) [0x7321ce370ffc] #3 /home/ray/anaconda3/lib/python3.10/site-packages/ray/_raylet.so(+0x104f4d7) [0x7321ce36f4d7] #4 /home/ray/anaconda3/lib/python3.10/site-packages/ray/_raylet.so(+0x1045833) [0x7321ce365833] #5 /home/ray/anaconda3/lib/python3.10/site-packages/ray/_raylet.so(+0xa6c760) [0x7321cdd8c760] #6 /home/ray/anaconda3/lib/python3.10/site-packages/ray/_raylet.so(+0xe69d9a) [0x7321ce189d9a] #7 /home/ray/anaconda3/lib/python3.10/site-packages/ray/_raylet.so(_ZN3ray3rpc14ClientCallImplINS0_16HealthCheckReplyEE15OnReplyReceivedEv+0x165) [0x7321ce18c005] #8 /home/ray/anaconda3/lib/python3.10/site-packages/ray/_raylet.so(_ZNSt17_Function_handlerIFvvEZN3ray3rpc17ClientCallManager29PollEventsFromCompletionQueueEiEUlvE_E9_M_invokeERKSt9_Any_data+0x15) [0x7321cdd8e475] #9 /home/ray/anaconda3/lib/python3.10/site-packages/ray/_raylet.so(+0x103e3eb) [0x7321ce35e3eb] #10 /home/ray/anaconda3/lib/python3.10/site-packages/ray/_raylet.so(+0x1034f0b) [0x7321ce354f0b] #11 /home/ray/anaconda3/lib/python3.10/site-packages/ray/_raylet.so(+0xb6f21b) [0x7321cde8f21b] #12 /home/ray/anaconda3/lib/python3.10/site-packages/ray/_raylet.so(+0x15893cb) [0x7321ce8a93cb] #13 /home/ray/anaconda3/lib/python3.10/site-packages/ray/_raylet.so(+0x158ad69) [0x7321ce8aad69] #14 /home/ray/anaconda3/lib/python3.10/site-packages/ray/_raylet.so(+0x158b472) [0x7321ce8ab472] #15 /home/ray/anaconda3/lib/python3.10/site-packages/ray/_raylet.so(+0xa6bb54) [0x7321cdd8bb54] #16 /home/ray/anaconda3/lib/python3.10/site-packages/ray/_raylet.so(+0xba2250) [0x7321cdec2250] #17 /lib/x86_64-linux-gnu/libc.so.6(+0x94ac3) [0x7321cf66eac3] #18 /lib/x86_64-linux-gnu/libc.so.6(+0x1268d0) [0x7321cf7008d0] *** SIGSEGV received at time=1770862205 on cpu 1 *** PC: @ 0x748a77bc5c1d (unknown) getenv @ 0x748a77bc3520 (unknown) (unknown) {"asctime":"2026-02-11 18:10:05,910","levelname":"E","message":"*** SIGSEGV received at time=1770862205 on cpu 1 ***","filename":"logging.cc","lineno":474} {"asctime":"2026-02-11 18:10:05,910","levelname":"E","message":"PC: @ 0x748a77bc5c1d (unknown) getenv","filename":"logging.cc","lineno":474} {"asctime":"2026-02-11 18:10:05,910","levelname":"E","message":" @ 0x748a77bc3520 (unknown) (unknown)","filename":"logging.cc","lineno":474} Fatal Python error: Segmentation fault ``` According to the backtrace, we can identify that it is the `OtlpGrpcMetricExporterOptions`, [which called `getenv(OTEL_CPP_EXPORTER_OTLP_METRICS_RETRY_BACKOFF_MULTIPLIER)`](https://github.com/open-telemetry/opentelemetry-cpp/blob/13ad05a6f431efb76995cffb1225d26b45374749/exporters/otlp/src/otlp_grpc_metric_exporter_options.cc#L47), getting initialized by calling `InitOpenTelemetryExporter` in the `metrics_agent_client_->WaitForServerReady()` callback, that causes the issue. This PR moves `OtlpGrpcMetricExporterOptions` into `OpenTelemetryMetricRecorder` (so that we keep otel details encapsulated) and moves its initialization early to `stats::Init()`, to force the `OtlpGrpcMetricExporterOptions` to be initialized early, so that we don't call `getenv` afterward. --------- Signed-off-by: Rueian Huang <rueiancsie@gmail.com>
pull bot
pushed a commit
that referenced
this pull request
Feb 28, 2026
## Description
grpc 1.57.1 will call `GetEnv("GRPC_EXPERIMENTAL_PICKFIRST_LB_CONFIG")`
on every grpc channel establishment for parsing load-balancing policy.
This causes race conditions between user tasks as they are allowed to do
setenv at anytime. This PR upgrades the grpc lib to 1.58.0 to get rid of
the `GetEnv("GRPC_EXPERIMENTAL_PICKFIRST_LB_CONFIG")`.
```
(gdb) bt
#0 __pthread_kill_implementation (no_tid=0, signo=11, threadid=129183804413504) at ./nptl/pthread_kill.c:44
#1 __pthread_kill_internal (signo=11, threadid=129183804413504) at ./nptl/pthread_kill.c:78
#2 __GI___pthread_kill (threadid=129183804413504, signo=signo@entry=11) at ./nptl/pthread_kill.c:89
#3 0x00007580a7545476 in __GI_raise (sig=11) at ../sysdeps/posix/raise.c:26
#4 <signal handler called>
#5 __pthread_kill_implementation (no_tid=0, signo=11, threadid=129183804413504) at ./nptl/pthread_kill.c:44
#6 __pthread_kill_internal (signo=11, threadid=129183804413504) at ./nptl/pthread_kill.c:78
#7 __GI___pthread_kill (threadid=129183804413504, signo=signo@entry=11) at ./nptl/pthread_kill.c:89
#8 0x00007580a7545476 in __GI_raise (sig=11) at ../sysdeps/posix/raise.c:26
#9 <signal handler called>
#10 __GI_getenv (name=0x7580a6a078c2 "PC_EXPERIMENTAL_PICKFIRST_LB_CONFIG") at ./stdlib/getenv.c:84
#11 0x00007580a67e8b8a in grpc_core::GetEnv(char const*) () from /home/ray/anaconda3/lib/python3.10/site-packages/ray/_raylet.so
#12 0x00007580a649601f in grpc_core::ShufflePickFirstEnabled() () from /home/ray/anaconda3/lib/python3.10/site-packages/ray/_raylet.so
#13 0x00007580a64960ed in grpc_core::json_detail::FinishedJsonObjectLoader<grpc_core::(anonymous namespace)::PickFirstConfig, 1ul, void>::LoadInto(grpc_core::experimental::Json const&, grpc_core::JsonArgs const&, void*, grpc_core::ValidationErrors*) const () from /home/ray/anaconda3/lib/python3.10/site-packages/ray/_raylet.so
#14 0x00007580a6787384 in grpc_core::json_detail::LoadWrapped::LoadInto(grpc_core::experimental::Json const&, grpc_core::JsonArgs const&, void*, grpc_core::ValidationErrors*) const ()
from /home/ray/anaconda3/lib/python3.10/site-packages/ray/_raylet.so
#15 0x00007580a6497b07 in grpc_core::(anonymous namespace)::PickFirstFactory::ParseLoadBalancingConfig(grpc_core::experimental::Json const&) const ()
from /home/ray/anaconda3/lib/python3.10/site-packages/ray/_raylet.so
#16 0x00007580a67c18a7 in grpc_core::LoadBalancingPolicyRegistry::ParseLoadBalancingConfig(grpc_core::experimental::Json const&) const ()
from /home/ray/anaconda3/lib/python3.10/site-packages/ray/_raylet.so
#17 0x00007580a66ad9b8 in grpc_core::ClientChannel::OnResolverResultChangedLocked(grpc_core::Resolver::Result) () from /home/ray/anaconda3/lib/python3.10/site-packages/ray/_raylet.so
#18 0x00007580a66ae452 in grpc_core::ClientChannel::ResolverResultHandler::ReportResult(grpc_core::Resolver::Result) ()
from /home/ray/anaconda3/lib/python3.10/site-packages/ray/_raylet.so
#19 0x00007580a63bc603 in grpc_core::PollingResolver::OnRequestCompleteLocked(grpc_core::Resolver::Result) () from /home/ray/anaconda3/lib/python3.10/site-packages/ray/_raylet.so
#20 0x00007580a63bcb2d in std::_Function_handler<void (), grpc_core::PollingResolver::OnRequestComplete(grpc_core::Resolver::Result)::{lambda()#1}>::_M_invoke(std::_Any_data const&)
() from /home/ray/anaconda3/lib/python3.10/site-packages/ray/_raylet.so
#21 0x00007580a67cbf46 in grpc_core::WorkSerializer::WorkSerializerImpl::Run(std::function<void ()>, grpc_core::DebugLocation const&) ()
from /home/ray/anaconda3/lib/python3.10/site-packages/ray/_raylet.so
#22 0x00007580a67cc0ea in grpc_core::WorkSerializer::Run(std::function<void ()>, grpc_core::DebugLocation const&) ()
from /home/ray/anaconda3/lib/python3.10/site-packages/ray/_raylet.so
#23 0x00007580a63bd117 in grpc_core::PollingResolver::OnRequestComplete(grpc_core::Resolver::Result) () from /home/ray/anaconda3/lib/python3.10/site-packages/ray/_raylet.so
#24 0x00007580a63b3f86 in grpc_core::(anonymous namespace)::AresClientChannelDNSResolver::AresRequestWrapper::OnHostnameResolved(void*, absl::lts_20230802::Status) ()
from /home/ray/anaconda3/lib/python3.10/site-packages/ray/_raylet.so
#25 0x00007580a67c44c4 in grpc_core::ExecCtx::Flush() () from /home/ray/anaconda3/lib/python3.10/site-packages/ray/_raylet.so
#26 0x00007580a63408a2 in grpc_core::ExecCtx::~ExecCtx() () from /home/ray/anaconda3/lib/python3.10/site-packages/ray/_raylet.so
#27 0x00007580a6740343 in grpc_call_start_batch () from /home/ray/anaconda3/lib/python3.10/site-packages/ray/_raylet.so
#28 0x00007580a5e281e9 in grpc::internal::CallOpSet<grpc::internal::CallOpSendInitialMetadata, grpc::internal::CallOpSendMessage, grpc::internal::CallOpRecvInitialMetadata, grpc::internal::CallOpRecvMessage<google::protobuf::MessageLite>, grpc::internal::CallOpClientSendClose, grpc::internal::CallOpClientRecvStatus>::ContinueFillOpsAfterInterception() ()
from /home/ray/anaconda3/lib/python3.10/site-packages/ray/_raylet.so
#29 0x00007580a5e2d809 in grpc::internal::BlockingUnaryCallImpl<google::protobuf::MessageLite, google::protobuf::MessageLite>::BlockingUnaryCallImpl(grpc::ChannelInterface*, grpc::inte--Type <RET> for more, q to quit, c to continue without paging--c
rnal::RpcMethod const&, grpc::ClientContext*, google::protobuf::MessageLite const&, google::protobuf::MessageLite*) () from /home/ray/anaconda3/lib/python3.10/site-packages/ray/_raylet.so
#30 0x00007580a62d76ea in opentelemetry::proto::collector::metrics::v1::MetricsService::Stub::Export(grpc::ClientContext*, opentelemetry::proto::collector::metrics::v1::ExportMetricsServiceRequest const&, opentelemetry::proto::collector::metrics::v1::ExportMetricsServiceResponse*) () from /home/ray/anaconda3/lib/python3.10/site-packages/ray/_raylet.so
#31 0x00007580a62ca40c in opentelemetry::v1::exporter::otlp::OtlpGrpcClient::DelegateExport(opentelemetry::proto::collector::metrics::v1::MetricsService::StubInterface*, std::unique_ptr<grpc::ClientContext, std::default_delete<grpc::ClientContext> >&&, std::unique_ptr<google::protobuf::Arena, std::default_delete<google::protobuf::Arena> >&&, opentelemetry::proto::collector::metrics::v1::ExportMetricsServiceRequest&&, opentelemetry::proto::collector::metrics::v1::ExportMetricsServiceResponse*) () from /home/ray/anaconda3/lib/python3.10/site-packages/ray/_raylet.so
#32 0x00007580a62c23ed in opentelemetry::v1::exporter::otlp::OtlpGrpcMetricExporter::Export(opentelemetry::v1::sdk::metrics::ResourceMetrics const&) () from /home/ray/anaconda3/lib/python3.10/site-packages/ray/_raylet.so
#33 0x00007580a62c0334 in (anonymous namespace)::OpenTelemetryMetricExporter::Export(opentelemetry::v1::sdk::metrics::ResourceMetrics const&) () from /home/ray/anaconda3/lib/python3.10/site-packages/ray/_raylet.so
#34 0x00007580a62e5fdf in opentelemetry::v1::sdk::metrics::PeriodicExportingMetricReader::CollectAndExportOnce()::{lambda()#1}::operator()() const::{lambda(opentelemetry::v1::sdk::metrics::ResourceMetrics&)#1}::operator()(opentelemetry::v1::sdk::metrics::ResourceMetrics&) const () from /home/ray/anaconda3/lib/python3.10/site-packages/ray/_raylet.so
#35 0x00007580a62ee7a6 in opentelemetry::v1::sdk::metrics::MetricReader::Collect(opentelemetry::v1::nostd::function_ref<bool (opentelemetry::v1::sdk::metrics::ResourceMetrics&)>) () from /home/ray/anaconda3/lib/python3.10/site-packages/ray/_raylet.so
#36 0x00007580a62e5085 in std::thread::_State_impl<std::thread::_Invoker<std::tuple<opentelemetry::v1::sdk::metrics::PeriodicExportingMetricReader::CollectAndExportOnce()::{lambda()#1}> > >::_M_run() () from /home/ray/anaconda3/lib/python3.10/site-packages/ray/_raylet.so
#37 0x00007580a6997be0 in execute_native_thread_routine () from /home/ray/anaconda3/lib/python3.10/site-packages/ray/_raylet.so
#38 0x00007580a7597ac3 in start_thread (arg=<optimized out>) at ./nptl/pthread_create.c:442
#39 0x00007580a76298d0 in clone3 () at ../sysdeps/unix/sysv/linux/x86_64/clone3.S:81
```
Signed-off-by: Rueian Huang <rueiancsie@gmail.com>
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
See Commits and Changes for more details.
Created by
pull[bot] (v2.0.0-alpha.1)
Can you help keep this open source service alive? 💖 Please sponsor : )