add class DSTensorTransport#1
Merged
Vito-Yang merged 1 commit intoVito-Yang:ds-backendfrom Oct 22, 2025
Merged
Conversation
Signed-off-by: Evelynn-V <liwenlin0223l@gmail.com>
Vito-Yang
pushed a commit
that referenced
this pull request
Dec 5, 2025
…BRT: "corrupted size vs. prev_size") (ray-project#58660) ## Summary This PR fixes a heap corruption bug that causes the driver to crash with SIGABRT. The issue is caused by a use-after-free when the `RayletClient` object is destroyed while an asynchronous RPC callback is still pending. ## Problem Description ### Scenario A Ray Data job (Ray 2.50.0) with pipeline `read_parquet -> filter -> map_batches -> write` running for 4+ hours, where workers use elastic resources with low job priority causing frequent worker deaths due to pod preemption, crashes the driver with SIGABRT: ``` corrupted size vs. prev_size *** SIGABRT received at time=1761916578 on cpu 30 *** PC: @ 0x7f073569d9fc (unknown) pthread_kill Aborted (core dumped) ``` ### Trigger Conditions After reproducing with an ASan image, Asan reveals the actual use-after-free at: ``` #0 0x7ff282967361 in std::__atomic_base<long>::fetch_sub(long, std::memory_order) /usr/include/c++/11/bits/atomic_base.h:628 #1 0x7ff282967361 in std::__atomic_base<long>::operator--(int) /usr/include/c++/11/bits/atomic_base.h:377 #2 0x7ff282967361 in operator() src/ray/raylet_rpc_client/raylet_client.cc:338 #3 0x7ff282967361 in __invoke_impl<void, ray::rpc::RayletClient::PinObjectIDs(const ray::rpc::Address&, const std::vector<ray::ObjectID>&, const ray::ObjectID&, ray::rpc::ClientCallback<ray::rpc::PinObjectIDsReply>&)::<lambda(ray::Status, ray::rpc::PinObjectIDsReply&&)>&, const ray::Status&, ray::rpc::PinObjectIDsReply> /usr/include/c++/11/bits/invoke.h:61 #4 0x7ff282967361 in __invoke_r<void, ray::rpc::RayletClient::PinObjectIDs(const ray::rpc::Address&, const std::vector<ray::ObjectID>&, const ray::ObjectID&, ray::rpc::ClientCallback<ray::rpc::PinObjectIDsReply>&)::<lambda(ray::Status, ray::rpc::PinObjectIDsReply&&)>&, const ray::Status&, ray::rpc::PinObjectIDsReply> /usr/include/c++/11/bits/invoke.h:111 #5 0x7ff282967361 in _M_invoke /usr/include/c++/11/bits/std_function.h:290 #6 0x7ff2829fbadf in std::function<void (ray::Status const&, ray::rpc::PinObjectIDsReply&&)>::operator()(ray::Status const&, ray::rpc::PinObjectIDsReply&&) const /usr/include/c++/11/bits/std_function.h:590 #7 0x7ff2829fbadf in ray::rpc::RetryableGrpcClient::RetryableGrpcRequest::Create<ray::rpc::NodeManagerService, ray::rpc::PinObjectIDsRequest, ray::rpc::PinObjectIDsReply>(std::weak_ptr<ray::rpc::RetryableGrpcClient>, std::unique_ptr<grpc::ClientAsyncResponseReader<ray::rpc::PinObjectIDsReply>, std::default_delete<grpc::ClientAsyncResponseReader<ray::rpc::PinObjectIDsReply> > > (ray::rpc::NodeManagerService::Stub::*)(grpc::ClientContext*, ray::rpc::PinObjectIDsRequest const&, grpc::CompletionQueue*), std::shared_ptr<ray::rpc::GrpcClient<ray::rpc::NodeManagerService> >, std::__cxx11::basic_string<char, std::char_traits<char>, std::allocator<char> >, ray::rpc::PinObjectIDsRequest, std::function<void (ray::Status const&, ray::rpc::PinObjectIDsReply&&)>, long)::{lambda(ray::Status const&)#2}::operator()(ray::Status const&) const bazel-out/k8-dbg/bin/src/ray/rpc/_virtual_includes/retryable_grpc_client/ray/rpc/retryable_grpc_client.h:293 #8 0x7ff2829fbadf in void std::__invoke_impl<void, ray::rpc::RetryableGrpcClient::RetryableGrpcRequest::Create<ray::rpc::NodeManagerService, ray::rpc::PinObjectIDsRequest, ray::rpc::PinObjectIDsReply>(std::weak_ptr<ray::rpc::RetryableGrpcClient>, std::unique_ptr<grpc::ClientAsyncResponseReader<ray::rpc::PinObjectIDsReply>, std::default_delete<grpc::ClientAsyncResponseReader<ray::rpc::PinObjectIDsReply> > > (ray::rpc::NodeManagerService::Stub::*)(grpc::ClientContext*, ray::rpc::PinObjectIDsRequest const&, grpc::CompletionQueue*), std::shared_ptr<ray::rpc::GrpcClient<ray::rpc::NodeManagerService> >, std::__cxx11::basic_string<char, std::char_traits<char>, std::allocator<char> >, ray::rpc::PinObjectIDsRequest, std::function<void (ray::Status const&, ray::rpc::PinObjectIDsReply&&)>, long)::{lambda(ray::Status const&)#2}&, ray::Status>(std::__invoke_other, ray::rpc::RetryableGrpcClient::RetryableGrpcRequest::Create<ray::rpc::NodeManagerService, ray::rpc::PinObjectIDsRequest, ray::rpc::PinObjectIDsReply>(std::weak_ptr<ray::rpc::RetryableGrpcClient>, std::unique_ptr<grpc::ClientAsyncResponseReader<ray::rpc::PinObjectIDsReply>, std::default_delete<grpc::ClientAsyncResponseReader<ray::rpc::PinObjectIDsReply> > > (ray::rpc::NodeManagerService::Stub::*)(grpc::ClientContext*, ray::rpc::PinObjectIDsRequest const&, grpc::CompletionQueue*), std::shared_ptr<ray::rpc::GrpcClient<ray::rpc::NodeManagerService> >, std::__cxx11::basic_string<char, std::char_traits<char>, std::allocator<char> >, ray::rpc::PinObjectIDsRequest, std::function<void (ray::Status const&, ray::rpc::PinObjectIDsReply&&)>, long)::{lambda(ray::Status const&)#2}&, ray::Status&&) /usr/include/c++/11/bits/invoke.h:61 ray-project#9 0x7ff2829fbadf in std::enable_if<is_invocable_r_v<void, ray::rpc::RetryableGrpcClient::RetryableGrpcRequest::Create<ray::rpc::NodeManagerService, ray::rpc::PinObjectIDsRequest, ray::rpc::PinObjectIDsReply>(std::weak_ptr<ray::rpc::RetryableGrpcClient>, std::unique_ptr<grpc::ClientAsyncResponseReader<ray::rpc::PinObjectIDsReply>, std::default_delete<grpc::ClientAsyncResponseReader<ray::rpc::PinObjectIDsReply> > > (ray::rpc::NodeManagerService::Stub::*)(grpc::ClientContext*, ray::rpc::PinObjectIDsRequest const&, grpc::CompletionQueue*), std::shared_ptr<ray::rpc::GrpcClient<ray::rpc::NodeManagerService> >, std::__cxx11::basic_string<char, std::char_traits<char>, std::allocator<char> >, ray::rpc::PinObjectIDsRequest, std::function<void (ray::Status const&, ray::rpc::PinObjectIDsReply&&)>, long)::{lambda(ray::Status const&)#2}&, ray::Status>, void>::type std::__invoke_r<void, ray::rpc::RetryableGrpcClient::RetryableGrpcRequest::Create<ray::rpc::NodeManagerService, ray::rpc::PinObjectIDsRequest, ray::rpc::PinObjectIDsReply>(std::weak_ptr<ray::rpc::RetryableGrpcClient>, std::unique_ptr<grpc::ClientAsyncResponseReader<ray::rpc::PinObjectIDsReply>, std::default_delete<grpc::ClientAsyncResponseReader<ray::rpc::PinObjectIDsReply> > > (ray::rpc::NodeManagerService::Stub::*)(grpc::ClientContext*, ray::rpc::PinObjectIDsRequest const&, grpc::CompletionQueue*), std::shared_ptr<ray::rpc::GrpcClient<ray::rpc::NodeManagerService> >, std::__cxx11::basic_string<char, std::char_traits<char>, std::allocator<char> >, ray::rpc::PinObjectIDsRequest, std::function<void (ray::Status const&, ray::rpc::PinObjectIDsReply&&)>, long)::{lambda(ray::Status const&)#2}&, ray::Status>(ray::rpc::RetryableGrpcClient::RetryableGrpcRequest::Create<ray::rpc::NodeManagerService, ray::rpc::PinObjectIDsRequest, ray::rpc::PinObjectIDsReply>(std::weak_ptr<ray::rpc::RetryableGrpcClient>, std::unique_ptr<grpc::ClientAsyncResponseReader<ray::rpc::PinObjectIDsReply>, std::default_delete<grpc::ClientAsyncResponseReader<ray::rpc::PinObjectIDsReply> > > (ray::rpc::NodeManagerService::Stub::*)(grpc::ClientContext*, ray::rpc::PinObjectIDsRequest const&, grpc::CompletionQueue*), std::shared_ptr<ray::rpc::GrpcClient<ray::rpc::NodeManagerService> >, std::__cxx11::basic_string<char, std::char_traits<char>, std::allocator<char> >, ray::rpc::PinObjectIDsRequest, std::function<void (ray::Status const&, ray::rpc::PinObjectIDsReply&&)>, long)::{lambda(ray::Status const&)#2}&, ray::Status&&) /usr/include/c++/11/bits/invoke.h:111 ray-project#10 0x7ff2829fbadf in std::_Function_handler<void (ray::Status), ray::rpc::RetryableGrpcClient::RetryableGrpcRequest::Create<ray::rpc::NodeManagerService, ray::rpc::PinObjectIDsRequest, ray::rpc::PinObjectIDsReply>(std::weak_ptr<ray::rpc::RetryableGrpcClient>, std::unique_ptr<grpc::ClientAsyncResponseReader<ray::rpc::PinObjectIDsReply>, std::default_delete<grpc::ClientAsyncResponseReader<ray::rpc::PinObjectIDsReply> > > (ray::rpc::NodeManagerService::Stub::*)(grpc::ClientContext*, ray::rpc::PinObjectIDsRequest const&, grpc::CompletionQueue*), std::shared_ptr<ray::rpc::GrpcClient<ray::rpc::NodeManagerService> >, std::__cxx11::basic_string<char, std::char_traits<char>, std::allocator<char> >, ray::rpc::PinObjectIDsRequest, std::function<void (ray::Status const&, ray::rpc::PinObjectIDsReply&&)>, long)::{lambda(ray::Status const&)#2}>::_M_invoke(std::_Any_data const&, ray::Status&&) /usr/include/c++/11/bits/std_function.h:290 ray-project#11 0x7ff2834657e9 in std::function<void (ray::Status)>::operator()(ray::Status) const /usr/include/c++/11/bits/std_function.h:590 ray-project#12 0x7ff2834657e9 in ray::rpc::RetryableGrpcClient::RetryableGrpcRequest::Fail(ray::Status const&) bazel-out/k8-dbg/bin/src/ray/rpc/_virtual_includes/retryable_grpc_client/ray/rpc/retryable_grpc_client.h:109 ray-project#13 0x7ff2834657e9 in operator() src/ray/rpc/retryable_grpc_client.cc:30 ray-project#14 0x7ff2834657e9 in __invoke_impl<void, ray::rpc::RetryableGrpcClient::~RetryableGrpcClient()::<lambda()>&> /usr/include/c++/11/bits/invoke.h:61 ray-project#15 0x7ff2834657e9 in __invoke_r<void, ray::rpc::RetryableGrpcClient::~RetryableGrpcClient()::<lambda()>&> /usr/include/c++/11/bits/invoke.h:111 ray-project#16 0x7ff2834657e9 in _M_invoke /usr/include/c++/11/bits/std_function.h:290 2025-11-14 16:15:05,608 INFO streaming_executor_state.py:511 -- Running activate tasks is {'MapBatches(QwenInfer)': ['MapBatches(QwenInfer)-79153', 'MapBatches(QwenInfer)-80170', 'MapBatches(QwenInfer)-80225', 'MapBatches(QwenInfer)-80299', 'MapBatches(QwenInfer)-82624'], 'MapBatches(drop_columns)->Write': ['MapBatches(drop_columns)->Write-25244', 'MapBatches(drop_columns)->Write-34438', 'MapBatches(drop_columns)->Write-34439', 'MapBatches(drop_columns)->Write-34440', 'MapBatches(drop_columns)->Write-34441']} ray-project#17 0x7ff2834e2407 in std::function<void ()>::operator()() const /usr/include/c++/11/bits/std_function.h:590 ray-project#18 0x7ff2834e2407 in EventTracker::RecordExecution(std::function<void ()> const&, std::shared_ptr<StatsHandle>) src/ray/common/event_stats.cc:112 ray-project#19 0x7ff2834bea54 in operator() src/ray/common/asio/instrumented_io_context.cc:110 ray-project#20 0x7ff2834bea54 in __invoke_impl<void, instrumented_io_context::post(std::function<void()>, std::string, int64_t)::<lambda()>&> /usr/include/c++/11/bits/invoke.h:61 ray-project#21 0x7ff2834bea54 in __invoke_r<void, instrumented_io_context::post(std::function<void()>, std::string, int64_t)::<lambda()>&> /usr/include/c++/11/bits/invoke.h:111 ray-project#22 0x7ff2834bea54 in _M_invoke /usr/include/c++/11/bits/std_function.h:290 ray-project#23 0x7ff28242fb5b in std::function<void ()>::operator()() const /usr/include/c++/11/bits/std_function.h:590 ray-project#24 0x7ff28242fb5b in boost::asio::detail::binder0<std::function<void ()> >::operator()() external/boost/boost/asio/detail/bind_handler.hpp:60 ray-project#25 0x7ff28242fb5b in void boost::asio::asio_handler_invoke<boost::asio::detail::binder0<std::function<void ()> > >(boost::asio::detail::binder0<std::function<void ()> >&, ...) external/boost/boost/asio/handler_invoke_hook.hpp:88 ray-project#26 0x7ff28242fb5b in void boost_asio_handler_invoke_helpers::invoke<boost::asio::detail::binder0<std::function<void ()> >, std::function<void ()> >(boost::asio::detail::binder0<std::function<void ()> >&, std::function<void ()>&) external/boost/boost/asio/detail/handler_invoke_helpers.hpp:54 ray-project#27 0x7ff28242fb5b in void boost::asio::detail::asio_handler_invoke<boost::asio::detail::binder0<std::function<void ()> >, std::function<void ()> >(boost::asio::detail::binder0<std::function<void ()> >&, boost::asio::detail::binder0<std::function<void ()> >*) external/boost/boost/asio/detail/bind_handler.hpp:111 ray-project#28 0x7ff28242fb5b in void boost_asio_handler_invoke_helpers::invoke<boost::asio::detail::binder0<std::function<void ()> >, boost::asio::detail::binder0<std::function<void ()> > >(boost::asio::detail::binder0<std::function<void ()> >&, boost::asio::detail::binder0<std::function<void ()> >&) external/boost/boost/asio/detail/handler_invoke_helpers.hpp:54 ray-project#29 0x7ff28242fb5b in boost::asio::detail::executor_op<boost::asio::detail::binder0<std::function<void ()> >, std::allocator<void>, boost::asio::detail::scheduler_operation>::do_complete(void*, boost::asio::detail::scheduler_operation*, boost::system::error_code const&, unsigned long) external/boost/boost/asio/detail/executor_op.hpp:70 ray-project#30 0x7ff2838607d6 in boost::asio::detail::scheduler_operation::complete(void*, boost::system::error_code const&, unsigned long) external/boost/boost/asio/detail/scheduler_operation.hpp:40 ray-project#31 0x7ff2838607d6 in boost::asio::detail::scheduler::do_run_one(boost::asio::detail::conditionally_enabled_mutex::scoped_lock&, boost::asio::detail::scheduler_thread_info&, boost::system::error_code const&) external/boost/boost/asio/detail/impl/scheduler.ipp:492 ray-project#32 0x7ff283892d35 in boost::asio::detail::scheduler::run(boost::system::error_code&) external/boost/boost/asio/detail/impl/scheduler.ipp:210 ray-project#33 0x7ff2838981e0 in boost::asio::io_context::run() external/boost/boost/asio/impl/io_context.ipp:63 2025-11-14 16:15:05,742 INFO streaming_executor_state.py:511 -- Running activate tasks is {'MapBatches(QwenInfer)': ['MapBatches(QwenInfer)-79153', 'MapBatches(QwenInfer)-80170', 'MapBatches(QwenInfer)-80225', 'MapBatches(QwenInfer)-80299', 'MapBatches(QwenInfer)-82624'], 'MapBatches(drop_columns)->Write': ['MapBatches(drop_columns)->Write-25244', 'MapBatches(drop_columns)->Write-34438', 'MapBatches(drop_columns)->Write-34439', 'MapBatches(drop_columns)->Write-34440', 'MapBatches(drop_columns)->Write-34441']} ray-project#34 0x7ff281e9d0aa in operator() src/ray/core_worker/core_worker_process.cc:193 ray-project#35 0x7ff281e9d247 in run external/boost/boost/thread/detail/thread.hpp:120 ray-project#36 0x7ff282503c47 in thread_proxy external/boost/libs/thread/src/pthread/thread.cpp:179 ray-project#37 0x7ff28b013ac2 in start_thread nptl/pthread_create.c:442 ray-project#38 0x7ff28b0a58bf (/lib/x86_64-linux-gnu/libc.so.6+0x1268bf) 0x50c003fd3d30 is located 112 bytes inside of 120-byte region [0x50c003fd3cc0,0x50c003fd3d38) freed by thread T68 here: 2025-11-14 16:15:05,876 INFO streaming_executor_state.py:511 -- Running activate tasks is {'MapBatches(QwenInfer)': ['MapBatches(QwenInfer)-79153', 'MapBatches(QwenInfer)-80170', 'MapBatches(QwenInfer)-80225', 'MapBatches(QwenInfer)-80299', 'MapBatches(QwenInfer)-82624'], 'MapBatches(drop_columns)->Write': ['MapBatches(drop_columns)->Write-25244', 'MapBatches(drop_columns)->Write-34438', 'MapBatches(drop_columns)->Write-34439', 'MapBatches(drop_columns)->Write-34440', 'MapBatches(drop_columns)->Write-34441']} #0 0x7ff28b39924f in operator delete(void*, unsigned long) ../../../../src/libsanitizer/asan/asan_new_delete.cpp:172 #1 0x7ff281eceb5f in __gnu_cxx::new_allocator<std::_Sp_counted_ptr_inplace<ray::rpc::RayletClient, std::allocator<ray::rpc::RayletClient>, (__gnu_cxx::_Lock_policy)2> >::deallocate(std::_Sp_counted_ptr_inplace<ray::rpc::RayletClient, std::allocator<ray::rpc::RayletClient>, (__gnu_cxx::_Lock_policy)2>*, unsigned long) /usr/include/c++/11/ext/new_allocator.h:145 #2 0x7ff281eceb5f in std::allocator_traits<std::allocator<std::_Sp_counted_ptr_inplace<ray::rpc::RayletClient, std::allocator<ray::rpc::RayletClient>, (__gnu_cxx::_Lock_policy)2> > >::deallocate(std::allocator<std::_Sp_counted_ptr_inplace<ray::rpc::RayletClient, std::allocator<ray::rpc::RayletClient>, (__gnu_cxx::_Lock_policy)2> >&, std::_Sp_counted_ptr_inplace<ray::rpc::RayletClient, std::allocator<ray::rpc::RayletClient>, (__gnu_cxx::_Lock_policy)2>*, unsigned long) /usr/include/c++/11/bits/alloc_traits.h:496 #3 0x7ff281eceb5f in std::__allocated_ptr<std::allocator<std::_Sp_counted_ptr_inplace<ray::rpc::RayletClient, std::allocator<ray::rpc::RayletClient>, (__gnu_cxx::_Lock_policy)2> > >::~__allocated_ptr() /usr/include/c++/11/bits/allocated_ptr.h:74 #4 0x7ff281eceb5f in std::_Sp_counted_ptr_inplace<ray::rpc::RayletClient, std::allocator<ray::rpc::RayletClient>, (__gnu_cxx::_Lock_policy)2>::_M_destroy() /usr/include/c++/11/bits/shared_ptr_base.h:538 #5 0x7ff282a73f0a in std::_Sp_counted_base<(__gnu_cxx::_Lock_policy)2>::_M_release() /usr/include/c++/11/bits/shared_ptr_base.h:184 #6 0x7ff282a73f0a in std::__shared_count<(__gnu_cxx::_Lock_policy)2>::~__shared_count() /usr/include/c++/11/bits/shared_ptr_base.h:705 #7 0x7ff282a73f0a in std::__shared_ptr<ray::RayletClientInterface, (__gnu_cxx::_Lock_policy)2>::~__shared_ptr() /usr/include/c++/11/bits/shared_ptr_base.h:1154 #8 0x7ff282a73f0a in std::shared_ptr<ray::RayletClientInterface>::~shared_ptr() /usr/include/c++/11/bits/shared_ptr.h:122 ray-project#9 0x7ff282a73f0a in std::pair<ray::NodeID const, std::shared_ptr<ray::RayletClientInterface> >::~pair() /usr/include/c++/11/bits/stl_pair.h:211 ray-project#10 0x7ff282a73f0a in void __gnu_cxx::new_allocator<std::pair<ray::NodeID const, std::shared_ptr<ray::RayletClientInterface> > >::destroy<std::pair<ray::NodeID const, std::shared_ptr<ray::RayletClientInterface> > >(std::pair<ray::NodeID const, std::shared_ptr<ray::RayletClientInterface> >*) /usr/include/c++/11/ext/new_allocator.h:168 ray-project#11 0x7ff282a73f0a in void std::allocator_traits<std::allocator<std::pair<ray::NodeID const, std::shared_ptr<ray::RayletClientInterface> > > >::destroy<std::pair<ray::NodeID const, std::shared_ptr<ray::RayletClientInterface> > >(std::allocator<std::pair<ray::NodeID const, std::shared_ptr<ray::RayletClientInterface> > >&, std::pair<ray::NodeID const, std::shared_ptr<ray::RayletClientInterface> >*) /usr/include/c++/11/bits/alloc_traits.h:535 ray-project#12 0x7ff282a73f0a in void absl::lts_20230802::container_internal::map_slot_policy<ray::NodeID, std::shared_ptr<ray::RayletClientInterface> >::destroy<std::allocator<std::pair<ray::NodeID const, std::shared_ptr<ray::RayletClientInterface> > > >(std::allocator<std::pair<ray::NodeID const, std::shared_ptr<ray::RayletClientInterface> > >*, absl::lts_20230802::container_internal::map_slot_type<ray::NodeID, std::shared_ptr<ray::RayletClientInterface> >*) external/com_google_absl/absl/container/internal/container_memory.h:421 ray-project#13 0x7ff282a73f0a in void absl::lts_20230802::container_internal::FlatHashMapPolicy<ray::NodeID, std::shared_ptr<ray::RayletClientInterface> >::destroy<std::allocator<std::pair<ray::NodeID const, std::shared_ptr<ray::RayletClientInterface> > > >(std::allocator<std::pair<ray::NodeID const, std::shared_ptr<ray::RayletClientInterface> > >*, absl::lts_20230802::container_internal::map_slot_type<ray::NodeID, std::shared_ptr<ray::RayletClientInterface> >*) external/com_google_absl/absl/container/flat_hash_map.h:578 ray-project#14 0x7ff282a73f0a in void absl::lts_20230802::container_internal::common_policy_traits<absl::lts_20230802::container_internal::FlatHashMapPolicy<ray::NodeID, std::shared_ptr<ray::RayletClientInterface> >, void>::destroy<std::allocator<std::pair<ray::NodeID const, std::shared_ptr<ray::RayletClientInterface> > > >(std::allocator<std::pair<ray::NodeID const, std::shared_ptr<ray::RayletClientInterface> > >*, absl::lts_20230802::container_internal::map_slot_type<ray::NodeID, std::shared_ptr<ray::RayletClientInterface> >*) external/com_google_absl/absl/container/internal/common_policy_traits.h:50 ray-project#15 0x7ff282a73f0a in absl::lts_20230802::container_internal::raw_hash_set<absl::lts_20230802::container_internal::FlatHashMapPolicy<ray::NodeID, std::shared_ptr<ray::RayletClientInterface> >, absl::lts_20230802::hash_internal::Hash<ray::NodeID>, std::equal_to<ray::NodeID>, std::allocator<std::pair<ray::NodeID const, std::shared_ptr<ray::RayletClientInterface> > > >::erase(absl::lts_20230802::container_internal::raw_hash_set<absl::lts_20230802::container_internal::FlatHashMapPolicy<ray::NodeID, std::shared_ptr<ray::RayletClientInterface> >, absl::lts_20230802::hash_internal::Hash<ray::NodeID>, std::equal_to<ray::NodeID>, std::allocator<std::pair<ray::NodeID const, std::shared_ptr<ray::RayletClientInterface> > > >::iterator) external/com_google_absl/absl/container/internal/raw_hash_set.h:2183 ray-project#16 0x7ff282a73f0a in ray::rpc::RayletClientPool::Disconnect(ray::NodeID) src/ray/raylet_rpc_client/raylet_client_pool.cc:114 ray-project#17 0x7ff282a7aa61 in operator() src/ray/raylet_rpc_client/raylet_client_pool.cc:69 ray-project#18 0x7ff282a7ac66 in __invoke_impl<void, ray::rpc::RayletClientPool::GetDefaultUnavailableTimeoutCallback(ray::gcs::GcsClient*, ray::rpc::RayletClientPool*, const ray::rpc::Address&)::<lambda()>&> /usr/include/c++/11/bits/invoke.h:61 ray-project#19 0x7ff282a7ac66 in __invoke_r<void, ray::rpc::RayletClientPool::GetDefaultUnavailableTimeoutCallback(ray::gcs::GcsClient*, ray::rpc::RayletClientPool*, const ray::rpc::Address&)::<lambda()>&> /usr/include/c++/11/bits/invoke.h:111 ray-project#20 0x7ff282a7ac66 in _M_invoke /usr/include/c++/11/bits/std_function.h:290 ray-project#21 0x7ff28346a1ac in std::function<void ()>::operator()() const /usr/include/c++/11/bits/std_function.h:590 ray-project#22 0x7ff28346a1ac in ray::rpc::RetryableGrpcClient::CheckChannelStatus(bool) src/ray/rpc/retryable_grpc_client.cc:85 ray-project#23 0x7ff28346c06a in operator() src/ray/rpc/retryable_grpc_client.cc:45 ``` This is a **non-deterministic race condition** that occurs under the following sequence: 1. Worker A's pod is preempted → Worker A dies 2. Objects on Worker A are lost 3. Objects are found on Worker B → `PinObjectIDs` RPC is initiated 4. Worker B dies or becomes unavailable → `CheckChannelStatus` detects this → `Disconnect` is called 5. The `RayletClient` corresponding to Worker B on the driver is destroyed 6. RPC callback executes and accesses the already-freed `RayletClient` → use-after-free triggers crash Whether the use-after-free occurs depends on the relative timing of steps 5 and 6. In scenarios with frequent pod preemptions, object recovery frequently triggers `PinObjectIDs`, making this race condition more likely to occur. ### Root Cause In `RayletClient::PinObjectIDs`, the RPC callback lambda directly captured the raw `this` pointer: ```cpp auto rpc_callback = [this, callback = std::move(callback)](...) { pins_in_flight_--; // Accessing member via 'this' pointer ... }; ``` If the `RayletClient` object is destroyed before the async RPC callback executes, the callback will access freed memory through the dangling `this` pointer, leading to heap corruption and SIGABRT with the error message "corrupted size vs. prev_size". ## Solution The fix ensures that the `RayletClient` object remains alive during the asynchronous callback execution by: 1. **Inheriting from `std::enable_shared_from_this<RayletClient>`**: The class already inherits from this base class (line 43 in `raylet_client.h`), which enables safe shared pointer management. 2. **Capturing `shared_from_this()` in the lambda**: Instead of capturing the raw `this` pointer, the callback now captures a `shared_ptr` to the object. The `shared_from_this()` is called before incrementing `pins_in_flight_` to ensure proper lifetime management: ```cpp // Capture shared_from_this() before incrementing to ensure object lifetime // is extended for the async callback, preventing use-after-free. auto self = shared_from_this(); pins_in_flight_++; auto rpc_callback = [self, callback = std::move(callback)]( Status status, rpc::PinObjectIDsReply &&reply) { self->pins_in_flight_--; callback(status, std::move(reply)); }; ``` This ensures that the `RayletClient` object's lifetime is extended until the callback completes, preventing the use-after-free bug. By capturing the shared pointer before incrementing the counter, we also ensure that if `shared_from_this()` were to fail (though it shouldn't in normal usage), we don't leave the counter in an inconsistent state. ## Code Changes - **File**: `src/ray/raylet_rpc_client/raylet_client.cc` - **Method**: `RayletClient::PinObjectIDs` - **Change**: Replace `this` capture with `shared_from_this()` capture in the RPC callback lambda Signed-off-by: dragongu <andrewgu@vip.qq.com> Co-authored-by: gulonglong <gulonglong@stepfun.com>
Vito-Yang
pushed a commit
that referenced
this pull request
Feb 4, 2026
…c callback during shutdown (ray-project#60048) ## Description When a Ray worker process shuts down (e.g., during `ray.shutdown()` or node termination), the OpenTelemetry `PeriodicExportingMetricReader`'s background thread may still be invoking the gauge callback (`_DoubleGaugeCallback`), which then accesses already-destroyed member data, resulting in a use-after-free crash. The error message: ``` (bundle_reservation_check_func pid=1543823) pure virtual method called (bundle_reservation_check_func pid=1543823) __cxa_deleted_virtual ``` I looked further into this, and ideally, at the OpenTelemetry code level, shutdown should be handled correctly. [PeriodicExportingMetricReader's shutdown](https://github.com/open-telemetry/opentelemetry-cpp/blob/f33dcc07c56c7e3b18fd18e13986f0eda965d116/sdk/src/metrics/export/periodic_exporting_metric_reader.cc#L292-L299) waits for `worker_thread_` to finish. ```c bool PeriodicExportingMetricReader::OnShutDown(std::chrono::microseconds timeout) noexcept { if (worker_thread_.joinable()) { cv_.notify_all(); worker_thread_.join(); } return exporter_->Shutdown(timeout); } ``` And callback(`worker_thread_`) is in a [while (IsShutdown() != true)](https://github.com/open-telemetry/opentelemetry-cpp/blob/main/sdk/src/metrics/export/periodic_exporting_metric_reader.cc#L147) loop. Therefore, there should be no use-after-free race condition at the OpenTelemetry code level, and it should be safe to call `meter_provider_->Shutdown()`. However, the issue is that the last callback appears to access member data that has already been destroyed during ForceFlush, which is called before Shutdown. This member data belongs to the OpenTelemetry SDK itself. The more I look into it, the more it feels like this is actually a bug in the OpenTelemetry SDK. And even further, I found this:[[SDK] Use shared_ptr internally for AttributesProcessor to prevent use-after-free ](open-telemetry/opentelemetry-cpp#3457) Which is exactly the issue I encountered! This PR upgrade the OpenTelemetry C++ SDK version to include this fix. ## Related issues > Link related issues: "Fixes ray-project#1234", "Closes ray-project#1234", or "Related to ray-project#1234". ## Additional information It is quit easy to reproduced, For example, if we manually running the `test_placement_group_reschedule_node_dead` in `python/ray/autoscaler/v2/tests/test_e2e.py`. ``` (docs) ubuntu@devbox:~/ray$ pkill -9 -f raylet 2>/dev/null || true; pkill -9 -f gcs_server 2>/dev/null || true; ray stop --force 2>/dev/null || true; sleep 2 Did not find any active Ray processes. (docs) ubuntu@devbox:~/ray$ timeout 180 python -m pytest python/ray/autoscaler/v2/tests/test_e2e.py::test_placement_group_reschedule_node_dead -xvs 2>&1 | tee /tmp/test_otel.txt; echo "EXIT CODE: $?" ............ __cxa_deleted_virtual opentelemetry::v1::sdk::metrics::FilteredOrderedAttributeMap::FilteredOrderedAttributeMap()::{lambda()#1}::operator()() opentelemetry::v1::nostd::function_ref<>::BindTo<>()::{lambda()#1}::operator()() opentelemetry::v1::sdk::metrics::ObserverResultT<>::Observe() opentelemetry::v1::metrics::ObserverResultT<>::Observe<>() ray::observability::OpenTelemetryMetricRecorder::CollectGaugeMetricValues() (anonymous namespace)::_DoubleGaugeCallback() opentelemetry::v1::sdk::metrics::ObservableRegistry::Observe() opentelemetry::v1::sdk::metrics::Meter::Collect() opentelemetry::v1::sdk::metrics::MetricCollector::Produce() opentelemetry::v1::sdk::metrics::MetricReader::Collect() opentelemetry::v1::sdk::metrics::PeriodicExportingMetricReader::CollectAndExportOnce() std::thread::_State_impl<>::_M_run() ............ ``` after this pr, no such error message: ``` (docs) ubuntu@devbox:~/ray$ timeout 180 python -m pytest python/ray/autoscaler/v2/tests/test_e2e.py::test_placement_group_reschedule_node_dead -xvs 2>&1 | tee /tmp/test_otel.txt; echo "EXIT CODE: $?" ============================= test session starts ============================== platform linux -- Python 3.12.12, pytest-9.0.2, pluggy-1.6.0 -- /home/ubuntu/.conda/envs/docs/bin/python cachedir: .pytest_cache rootdir: /home/ubuntu/ray configfile: pytest.ini plugins: asyncio-1.3.0, anyio-4.11.0 asyncio: mode=Mode.STRICT, debug=False, asyncio_default_fixture_loop_scope=None, asyncio_default_test_loop_scope=function collecting ... collected 2 items python/ray/autoscaler/v2/tests/test_e2e.py::test_placement_group_reschedule_node_dead[v1] Did not find any active Ray processes. Usage stats collection is enabled. To disable this, add `--disable-usage-stats` to the command that starts the cluster, or run the following command: `ray disable-usage-stats` before starting the cluster. See https://docs.ray.io/en/master/cluster/usage-stats.html for more details. Local node IP: 172.31.5.171 -------------------- Ray runtime started. -------------------- Next steps To add another node to this Ray cluster, run ray start --address='172.31.5.171:6379' To connect to this Ray cluster: import ray ray.init() To submit a Ray job using the Ray Jobs CLI: RAY_API_SERVER_ADDRESS='http://127.0.0.1:8265' ray job submit --working-dir . -- python my_script.py See https://docs.ray.io/en/latest/cluster/running-applications/job-submission/index.html for more information on submitting Ray jobs to the Ray cluster. To terminate the Ray runtime, run ray stop To view the status of the cluster, use ray status To monitor and debug Ray, view the dashboard at 127.0.0.1:8265 If connection to the dashboard fails, check your firewall settings and network configuration. 2026-01-12 12:30:00,347 INFO worker.py:1826 -- Connecting to existing Ray cluster at address: 172.31.5.171:6379... 2026-01-12 12:30:00,385 INFO worker.py:2006 -- Connected to Ray cluster. View the dashboard at http://127.0.0.1:8265 (autoscaler +11s) Tip: use `ray status` to view detailed cluster status. To disable these messages, set RAY_SCHEDULER_EVENTS=0. (autoscaler +11s) Resized to 0 CPUs. (autoscaler +12s) Resized to 0 CPUs. (autoscaler +14s) Resized to 0 CPUs. (autoscaler +15s) Resized to 0 CPUs. (autoscaler +15s) Adding 1 node(s) of type type-1. (autoscaler +15s) Adding 1 node(s) of type type-2. (autoscaler +15s) Adding 1 node(s) of type type-3. (autoscaler +15s) Adding 1 node(s) of type type-1. (autoscaler +15s) Adding 1 node(s) of type type-2. (autoscaler +15s) Adding 1 node(s) of type type-3. (autoscaler +15s) Adding 1 node(s) of type type-1. (autoscaler +15s) Adding 1 node(s) of type type-2. (autoscaler +15s) Adding 1 node(s) of type type-3. (autoscaler +15s) Adding 1 node(s) of type type-1. (autoscaler +15s) Adding 1 node(s) of type type-2. (autoscaler +15s) Adding 1 node(s) of type type-3. (autoscaler +16s) Resized to 0 CPUs. (autoscaler +16s) Adding 1 node(s) of type type-1. (autoscaler +16s) Adding 1 node(s) of type type-2. (autoscaler +16s) Adding 1 node(s) of type type-3. Killing pids 1566233 (raylet) Raylet is terminated. Termination is unexpected. Possible reasons include: (1) SIGKILL by the user or system OOM killer, (2) Invalid memory access from Raylet causing SIGSEGV or SIGBUS, (3) Other termination signals. Last 20 lines of the Raylet logs: [state-dump] ray::rpc::InternalKVGcsService.grpc_client.GetInternalConfig.OnReplyReceived - 1 total (0 active), Execution time: mean = 880.39ms, total = 880.39ms, Queueing time: mean = 0.06ms, max = 0.06ms, min = 0.06ms, total = 0.06ms [state-dump] ClusterResourceManager.ResetRemoteNodeView - 1 total (1 active), Execution time: mean = 0.00ms, total = 0.00ms, Queueing time: mean = 0.00ms, max = -0.00ms, min = 9223372036854.78ms, total = 0.00ms [state-dump] DebugString() time ms: 1 [state-dump] [state-dump] [2026-01-12 12:29:59,875 I 1565894 1565894] (raylet) accessor.cc:436: Received address and liveness notification for node, IsAlive = 1 node_id=fffffffffffffffffffffffffffffffffffffffffffffffffff00000 [2026-01-12 12:30:00,447 I 1565894 1565917] (raylet) object_store.cc:37: Object store current usage 8e-09 / 27.3914 GB. [2026-01-12 12:30:00,453 I 1565894 1565894] (raylet) worker_pool.cc:733: Job 01000000 already started in worker pool. [2026-01-12 12:30:02,834 I 1565894 1565894] (raylet) accessor.cc:436: Received address and liveness notification for node, IsAlive = 1 node_id=fffffffffffffffffffffffffffffffffffffffffffffffffff00001 [2026-01-12 12:30:02,851 I 1565894 1565894] (raylet) ray_syncer_bidi_reactor_base.h:76: Dropping sync message with stale version. latest version: 0, dropped message version: 0 [2026-01-12 12:30:03,995 I 1565894 1565894] (raylet) accessor.cc:436: Received address and liveness notification for node, IsAlive = 1 node_id=fffffffffffffffffffffffffffffffffffffffffffffffffff00002 [2026-01-12 12:30:04,012 I 1565894 1565894] (raylet) ray_syncer_bidi_reactor_base.h:76: Dropping sync message with stale version. latest version: 0, dropped message version: 0 [2026-01-12 12:30:05,178 I 1565894 1565894] (raylet) accessor.cc:436: Received address and liveness notification for node, IsAlive = 1 node_id=fffffffffffffffffffffffffffffffffffffffffffffffffff00003 [2026-01-12 12:30:05,197 I 1565894 1565894] (raylet) ray_syncer_bidi_reactor_base.h:76: Dropping sync message with stale version. latest version: 1, dropped message version: 1 [2026-01-12 12:30:05,215 I 1565894 1565894] (raylet) ray_syncer_bidi_reactor_base.h:76: Dropping sync message with stale version. latest version: 5, dropped message version: 5 [2026-01-12 12:30:05,254 I 1565894 1565894] (raylet) ray_syncer_bidi_reactor_base.h:76: Dropping sync message with stale version. latest version: 5, dropped message version: 5 [2026-01-12 12:30:05,297 I 1565894 1565894] (raylet) ray_syncer_bidi_reactor_base.h:76: Dropping sync message with stale version. latest version: 5, dropped message version: 5 [2026-01-12 12:30:05,315 I 1565894 1565894] (raylet) ray_syncer_bidi_reactor_base.h:76: Dropping sync message with stale version. latest version: 6, dropped message version: 6 [2026-01-12 12:30:05,716 I 1565894 1565894] (raylet) ray_syncer_bidi_reactor_base.h:76: Dropping sync message with stale version. latest version: 7, dropped message version: 7 [2026-01-12 12:30:05,817 I 1565894 1565894] (raylet) ray_syncer_bidi_reactor_base.h:76: Dropping sync message with stale version. latest version: 9, dropped message version: 9 (autoscaler +17s) Adding 1 node(s) of type type-3. (autoscaler +17s) Adding 1 node(s) of type type-3. (autoscaler +17s) Adding 1 node(s) of type type-3. (autoscaler +17s) Adding 1 node(s) of type type-3. (autoscaler +17s) Adding 1 node(s) of type type-3. (autoscaler +24s) Removing 1 nodes of type type-3 (idle). (autoscaler +24s) Removing 1 nodes of type type-3 (idle). (autoscaler +24s) Removing 1 nodes of type type-3 (idle). (autoscaler +24s) Removing 1 nodes of type type-3 (idle). (raylet) The node with node id: fffffffffffffffffffffffffffffffffffffffffffffffffff00001 and address: 172.31.5.171 and node name: 172.31.5.171 has been marked dead because the detector has missed too many heartbeats from it. This can happen when a (1) raylet crashes unexpectedly (OOM, etc.) (2) raylet has lagging heartbeats due to slow network or busy workload. (raylet) Raylet is terminated. Termination is unexpected. Possible reasons include: (1) SIGKILL by the user or system OOM killer, (2) Invalid memory access from Raylet causing SIGSEGV or SIGBUS, (3) Other termination signals. Last 20 lines of the Raylet logs: [state-dump] ray::rpc::InternalKVGcsService.grpc_client.GetInternalConfig.OnReplyReceived - 1 total (0 active), Execution time: mean = 880.39ms, total = 880.39ms, Queueing time: mean = 0.06ms, max = 0.06ms, min = 0.06ms, total = 0.06ms [state-dump] ClusterResourceManager.ResetRemoteNodeView - 1 total (1 active), Execution time: mean = 0.00ms, total = 0.00ms, Queueing time: mean = 0.00ms, max = -0.00ms, min = 9223372036854.78ms, total = 0.00ms [state-dump] DebugString() time ms: 1 [state-dump] [state-dump] [2026-01-12 12:29:59,875 I 1565894 1565894] (raylet) accessor.cc:436: Received address and liveness notification for node, IsAlive = 1 node_id=fffffffffffffffffffffffffffffffffffffffffffffffffff00000 [2026-01-12 12:30:00,447 I 1565894 1565917] (raylet) object_store.cc:37: Object store current usage 8e-09 / 27.3914 GB. [2026-01-12 12:30:00,453 I 1565894 1565894] (raylet) worker_pool.cc:733: Job 01000000 already started in worker pool. [2026-01-12 12:30:02,834 I 1565894 1565894] (raylet) accessor.cc:436: Received address and liveness notification for node, IsAlive = 1 node_id=fffffffffffffffffffffffffffffffffffffffffffffffffff00001 [2026-01-12 12:30:02,851 I 1565894 1565894] (raylet) ray_syncer_bidi_reactor_base.h:76: Dropping sync message with stale version. latest version: 0, dropped message version: 0 [2026-01-12 12:30:03,995 I 1565894 1565894] (raylet) accessor.cc:436: Received address and liveness notification for node, IsAlive = 1 node_id=fffffffffffffffffffffffffffffffffffffffffffffffffff00002 [2026-01-12 12:30:04,012 I 1565894 1565894] (raylet) ray_syncer_bidi_reactor_base.h:76: Dropping sync message with stale version. latest version: 0, dropped message version: 0 [2026-01-12 12:30:05,178 I 1565894 1565894] (raylet) accessor.cc:436: Received address and liveness notification for node, IsAlive = 1 node_id=fffffffffffffffffffffffffffffffffffffffffffffffffff00003 [2026-01-12 12:30:05,197 I 1565894 1565894] (raylet) ray_syncer_bidi_reactor_base.h:76: Dropping sync message with stale version. latest version: 1, dropped message version: 1 [2026-01-12 12:30:05,215 I 1565894 1565894] (raylet) ray_syncer_bidi_reactor_base.h:76: Dropping sync message with stale version. latest version: 5, dropped message version: 5 [2026-01-12 12:30:05,254 I 1565894 1565894] (raylet) ray_syncer_bidi_reactor_base.h:76: Dropping sync message with stale version. latest version: 5, dropped message version: 5 [2026-01-12 12:30:05,297 I 1565894 1565894] (raylet) ray_syncer_bidi_reactor_base.h:76: Dropping sync message with stale version. latest version: 5, dropped message version: 5 [2026-01-12 12:30:05,315 I 1565894 1565894] (raylet) ray_syncer_bidi_reactor_base.h:76: Dropping sync message with stale version. latest version: 6, dropped message version: 6 [2026-01-12 12:30:05,716 I 1565894 1565894] (raylet) ray_syncer_bidi_reactor_base.h:76: Dropping sync message with stale version. latest version: 7, dropped message version: 7 [2026-01-12 12:30:05,817 I 1565894 1565894] (raylet) ray_syncer_bidi_reactor_base.h:76: Dropping sync message with stale version. latest version: 9, dropped message version: 9 Stopped all 10 Ray processes. (autoscaler +32s) Resized to 0 CPUs. (autoscaler +32s) Adding 1 node(s) of type type-1. (autoscaler +32s) Adding 1 node(s) of type type-2. (autoscaler +32s) Adding 1 node(s) of type type-3. (autoscaler +32s) Adding 1 node(s) of type type-3. (autoscaler +32s) Removing 1 nodes of type type-3 (idle). PASSED python/ray/autoscaler/v2/tests/test_e2e.py::test_placement_group_reschedule_node_dead[v2] Did not find any active Ray processes. Usage stats collection is enabled. To disable this, add `--disable-usage-stats` to the command that starts the cluster, or run the following command: `ray disable-usage-stats` before starting the cluster. See https://docs.ray.io/en/master/cluster/usage-stats.html for more details. Local node IP: 172.31.5.171 -------------------- Ray runtime started. -------------------- Next steps To add another node to this Ray cluster, run ray start --address='172.31.5.171:6379' To connect to this Ray cluster: import ray ray.init() To submit a Ray job using the Ray Jobs CLI: RAY_API_SERVER_ADDRESS='http://127.0.0.1:8265' ray job submit --working-dir . -- python my_script.py See https://docs.ray.io/en/latest/cluster/running-applications/job-submission/index.html for more information on submitting Ray jobs to the Ray cluster. To terminate the Ray runtime, run ray stop To view the status of the cluster, use ray status To monitor and debug Ray, view the dashboard at 127.0.0.1:8265 If connection to the dashboard fails, check your firewall settings and network configuration. 2026-01-12 12:30:40,170 INFO worker.py:1826 -- Connecting to existing Ray cluster at address: 172.31.5.171:6379... 2026-01-12 12:30:40,202 INFO worker.py:2006 -- Connected to Ray cluster. View the dashboard at http://127.0.0.1:8265 Stopped only 9 out of 12 Ray processes within the grace period 16 seconds. Set `-v` to see more details. Remaining processes [psutil.Process(pid=1569612, name='raylet', status='terminated'), psutil.Process(pid=1569160, name='raylet', status='terminated'), psutil.Process(pid=1568952, name='raylet', status='terminated')] will be forcefully terminated. You can also use `--force` to forcefully terminate processes or set higher `--grace-period` to wait longer time for proper termination. Killing pids 1568744 (raylet) Raylet is terminated. Termination is unexpected. Possible reasons include: (1) SIGKILL by the user or system OOM killer, (2) Invalid memory access from Raylet causing SIGSEGV or SIGBUS, (3) Other termination signals. Last 20 lines of the Raylet logs: [state-dump] NodeManager.deadline_timer.spill_objects_when_over_threshold - 1 total (1 active), Execution time: mean = 0.00ms, total = 0.00ms, Queueing time: mean = 0.00ms, max = -0.00ms, min = 9223372036854.78ms, total = 0.00ms [state-dump] DebugString() time ms: 0 [state-dump] [state-dump] [2026-01-12 12:30:39,701 I 1568506 1568506] (raylet) accessor.cc:436: Received address and liveness notification for node, IsAlive = 1 node_id=fffffffffffffffffffffffffffffffffffffffffffffffffff00000 [2026-01-12 12:30:40,257 I 1568506 1568529] (raylet) object_store.cc:37: Object store current usage 8e-09 / 27.3852 GB. [2026-01-12 12:30:40,262 I 1568506 1568506] (raylet) worker_pool.cc:733: Job 01000000 already started in worker pool. [2026-01-12 12:30:41,697 I 1568506 1568506] (raylet) accessor.cc:436: Received address and liveness notification for node, IsAlive = 1 node_id=fffffffffffffffffffffffffffffffffffffffffffffffffff00001 [2026-01-12 12:30:41,714 I 1568506 1568506] (raylet) ray_syncer_bidi_reactor_base.h:76: Dropping sync message with stale version. latest version: 0, dropped message version: 0 [2026-01-12 12:30:42,858 I 1568506 1568506] (raylet) accessor.cc:436: Received address and liveness notification for node, IsAlive = 1 node_id=fffffffffffffffffffffffffffffffffffffffffffffffffff00002 [2026-01-12 12:30:42,876 I 1568506 1568506] (raylet) ray_syncer_bidi_reactor_base.h:76: Dropping sync message with stale version. latest version: 0, dropped message version: 0 [2026-01-12 12:30:44,050 I 1568506 1568506] (raylet) accessor.cc:436: Received address and liveness notification for node, IsAlive = 1 node_id=fffffffffffffffffffffffffffffffffffffffffffffffffff00003 [2026-01-12 12:30:44,073 I 1568506 1568506] (raylet) ray_syncer_bidi_reactor_base.h:76: Dropping sync message with stale version. latest version: 0, dropped message version: 0 [2026-01-12 12:30:45,018 I 1568506 1568506] (raylet) ray_syncer_bidi_reactor_base.h:76: Dropping sync message with stale version. latest version: 1, dropped message version: 1 [2026-01-12 12:30:45,076 I 1568506 1568506] (raylet) ray_syncer_bidi_reactor_base.h:76: Dropping sync message with stale version. latest version: 5, dropped message version: 5 [2026-01-12 12:30:45,079 I 1568506 1568506] (raylet) ray_syncer_bidi_reactor_base.h:76: Dropping sync message with stale version. latest version: 5, dropped message version: 5 [2026-01-12 12:30:45,119 I 1568506 1568506] (raylet) ray_syncer_bidi_reactor_base.h:76: Dropping sync message with stale version. latest version: 5, dropped message version: 5 [2026-01-12 12:30:45,177 I 1568506 1568506] (raylet) ray_syncer_bidi_reactor_base.h:76: Dropping sync message with stale version. latest version: 6, dropped message version: 6 [2026-01-12 12:30:45,578 I 1568506 1568506] (raylet) ray_syncer_bidi_reactor_base.h:76: Dropping sync message with stale version. latest version: 7, dropped message version: 7 [2026-01-12 12:30:45,679 I 1568506 1568506] (raylet) ray_syncer_bidi_reactor_base.h:76: Dropping sync message with stale version. latest version: 9, dropped message version: 9 (raylet) The node with node id: fffffffffffffffffffffffffffffffffffffffffffffffffff00001 and address: 172.31.5.171 and node name: 172.31.5.171 has been marked dead because the detector has missed too many heartbeats from it. This can happen when a (1) raylet crashes unexpectedly (OOM, etc.) (2) raylet has lagging heartbeats due to slow network or busy workload. (raylet) Raylet is terminated. Termination is unexpected. Possible reasons include: (1) SIGKILL by the user or system OOM killer, (2) Invalid memory access from Raylet causing SIGSEGV or SIGBUS, (3) Other termination signals. Last 20 lines of the Raylet logs: [state-dump] NodeManager.deadline_timer.spill_objects_when_over_threshold - 1 total (1 active), Execution time: mean = 0.00ms, total = 0.00ms, Queueing time: mean = 0.00ms, max = -0.00ms, min = 9223372036854.78ms, total = 0.00ms [state-dump] DebugString() time ms: 0 [state-dump] [state-dump] [2026-01-12 12:30:39,701 I 1568506 1568506] (raylet) accessor.cc:436: Received address and liveness notification for node, IsAlive = 1 node_id=fffffffffffffffffffffffffffffffffffffffffffffffffff00000 [2026-01-12 12:30:40,257 I 1568506 1568529] (raylet) object_store.cc:37: Object store current usage 8e-09 / 27.3852 GB. [2026-01-12 12:30:40,262 I 1568506 1568506] (raylet) worker_pool.cc:733: Job 01000000 already started in worker pool. [2026-01-12 12:30:41,697 I 1568506 1568506] (raylet) accessor.cc:436: Received address and liveness notification for node, IsAlive = 1 node_id=fffffffffffffffffffffffffffffffffffffffffffffffffff00001 [2026-01-12 12:30:41,714 I 1568506 1568506] (raylet) ray_syncer_bidi_reactor_base.h:76: Dropping sync message with stale version. latest version: 0, dropped message version: 0 [2026-01-12 12:30:42,858 I 1568506 1568506] (raylet) accessor.cc:436: Received address and liveness notification for node, IsAlive = 1 node_id=fffffffffffffffffffffffffffffffffffffffffffffffffff00002 [2026-01-12 12:30:42,876 I 1568506 1568506] (raylet) ray_syncer_bidi_reactor_base.h:76: Dropping sync message with stale version. latest version: 0, dropped message version: 0 [2026-01-12 12:30:44,050 I 1568506 1568506] (raylet) accessor.cc:436: Received address and liveness notification for node, IsAlive = 1 node_id=fffffffffffffffffffffffffffffffffffffffffffffffffff00003 [2026-01-12 12:30:44,073 I 1568506 1568506] (raylet) ray_syncer_bidi_reactor_base.h:76: Dropping sync message with stale version. latest version: 0, dropped message version: 0 [2026-01-12 12:30:45,018 I 1568506 1568506] (raylet) ray_syncer_bidi_reactor_base.h:76: Dropping sync message with stale version. latest version: 1, dropped message version: 1 [2026-01-12 12:30:45,076 I 1568506 1568506] (raylet) ray_syncer_bidi_reactor_base.h:76: Dropping sync message with stale version. latest version: 5, dropped message version: 5 [2026-01-12 12:30:45,079 I 1568506 1568506] (raylet) ray_syncer_bidi_reactor_base.h:76: Dropping sync message with stale version. latest version: 5, dropped message version: 5 [2026-01-12 12:30:45,119 I 1568506 1568506] (raylet) ray_syncer_bidi_reactor_base.h:76: Dropping sync message with stale version. latest version: 5, dropped message version: 5 [2026-01-12 12:30:45,177 I 1568506 1568506] (raylet) ray_syncer_bidi_reactor_base.h:76: Dropping sync message with stale version. latest version: 6, dropped message version: 6 [2026-01-12 12:30:45,578 I 1568506 1568506] (raylet) ray_syncer_bidi_reactor_base.h:76: Dropping sync message with stale version. latest version: 7, dropped message version: 7 [2026-01-12 12:30:45,679 I 1568506 1568506] (raylet) ray_syncer_bidi_reactor_base.h:76: Dropping sync message with stale version. latest version: 9, dropped message version: 9 PASSED ========================= 2 passed in 80.90s (0:01:20) ========================= EXIT CODE: 0 (docs) ubuntu@devbox:~/ray$ ``` Signed-off-by: yicheng <yicheng@anyscale.com> Co-authored-by: yicheng <yicheng@anyscale.com>
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Description
Related issues
Additional information