Skip to content

Automated fix for refs/heads/fix/39583#3

Open
github-actions[bot] wants to merge 2 commits into
fix/39583from
create-pull-request/patch-c2f0e6f
Open

Automated fix for refs/heads/fix/39583#3
github-actions[bot] wants to merge 2 commits into
fix/39583from
create-pull-request/patch-c2f0e6f

Conversation

@github-actions

Copy link
Copy Markdown

PanCakes to the rescue!

We noticed that our 'sanity' test was going to fail, but we think we can fix that automatically, so we put together this PR to do just that!

If you'd like to opt-out of these PR's, add yourself to NO_AUTOFIX_USERS in .github/workflows/pr-auto-fix.yaml

asheshvidyut and others added 2 commits May 19, 2025 18:35
Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
asheshvidyut pushed a commit that referenced this pull request Jun 6, 2025
Solve the thundering herd problem that was caused by the previous wakeup logic (now that that's becoming a problem)

This change adapts the algorithm in mpscq (originally from dvyukov) for promises, and adds some useful functionality on top.

The queue is closable: it's possible for the sender or receiver to unilaterally close the queue and force errors on send and receive (and all existing work thrown away).

The queue is bounded: pushback is applied at the sender if the queue is too full. An API exists to allow readers of the queue to persist the tokens queued in the MPSC until the work item is complete sometime after - chaotic good will use this to allow messages to be kept outstanding until they are actually written to a socket.

To arrange all this, the queue is split into three parts:

1. An unbounded MPSC of incoming data. If there is too much data queued we set a blocking bit, and the queuing promise will block until that bit is cleared. In the case of waiters, where the original mpscq would return nullptr, the new queue CAS's in a Waker object so we can get exact wakeup pingponging between reader and writer when that is appropriate.

2. A single threaded SPSC of "accepted" items. This (plus the items in #3) form the bound of the queue. When we are reading and under bound we pull in as many nodes as possible early (so as to kick start other systems to producing the next batch of work), and queue it in a separate SPSC. Next() can then pull from that directly for a small period.

3. A sea of accepted but not complete work items. The Queued<> type tracks these. When the objects are released, we finally release the tokens, and in doing so allow more work to be accepted.

PiperOrigin-RevId: 766368817
asheshvidyut pushed a commit that referenced this pull request Jun 10, 2025
This fixes a bug introduced in grpc#39736 whereby the LRS server may call `StartWrite()` after cancellation has already called `Finish()`, thus leading to a TSAN failure.

This is arguably a deficiency in the callback API.  The write should definitely fail, but it shouldn't cause a TSAN problem.  But we can consider that as part of a separate effort.

Example stack trace of the TSAN failure:

```
WARNING: ThreadSanitizer: data race (pid=17)
  Read of size 1 at 0x72680002dd80 by thread T5:
    #0 grpc::internal::CallbackBidiHandler::ServerCallbackReaderWriterImpl::Finish(grpc::Status) /proc/self/cwd/include/grpcpp/impl/server_callback_handlers.h:742:18 (xds_cluster_end2end_test@poller=epoll1+0x32896b2)
    #1 grpc::ServerBidiReactor::Finish(grpc::Status) /proc/self/cwd/include/grpcpp/support/server_callback.h:414:13 (xds_cluster_end2end_test@poller=epoll1+0x328dbb4)
    #2 grpc::testing::LrsServiceImpl::Reactor::OnCancel() /proc/self/cwd/test/cpp/end2end/xds/xds_server.cc:494:3 (xds_cluster_end2end_test@poller=epoll1+0x32c6ec1)
    #3 grpc::internal::ServerCallbackCall::CallOnCancel(grpc::internal::ServerReactor*)::$_0::operator()() const /proc/self/cwd/src/cpp/server/server_callback.cc:38:14 (xds_cluster_end2end_test@poller=epoll1+0x3b4755b)
    #4 void std::__invoke_impl(std::__invoke_other, grpc::internal::ServerCallbackCall::CallOnCancel(grpc::internal::ServerReactor*)::$_0&) /usr/lib/gcc/x86_64-linux-gnu/9/../../../../include/c++/9/bits/invoke.h:60:14 (xds_cluster_end2end_test@poller=epoll1+0x3b474d5)
    #5 std::__invoke_result::type std::__invoke(grpc::internal::ServerCallbackCall::CallOnCancel(grpc::internal::ServerReactor*)::$_0&) /usr/lib/gcc/x86_64-linux-gnu/9/../../../../include/c++/9/bits/invoke.h:95:14 (xds_cluster_end2end_test@poller=epoll1+0x3b47485)
    #6 std::invoke_result::type std::invoke(grpc::internal::ServerCallbackCall::CallOnCancel(grpc::internal::ServerReactor*)::$_0&) /usr/lib/gcc/x86_64-linux-gnu/9/../../../../include/c++/9/functional:81:14 (xds_cluster_end2end_test@poller=epoll1+0x3b47435)
    #7 void absl::lts_20250127::internal_any_invocable::InvokeR(grpc::internal::ServerCallbackCall::CallOnCancel(grpc::internal::ServerReactor*)::$_0&) /proc/self/cwd/external/com_google_absl/absl/functional/internal/any_invocable.h:132:3 (xds_cluster_end2end_test@poller=epoll1+0x3b473d5)
    #8 void absl::lts_20250127::internal_any_invocable::LocalInvoker(absl::lts_20250127::internal_any_invocable::TypeErasedState*) /proc/self/cwd/external/com_google_absl/absl/functional/internal/any_invocable.h:310:10 (xds_cluster_end2end_test@poller=epoll1+0x3b472f2)
    #9 absl::lts_20250127::internal_any_invocable::Impl::operator()() /proc/self/cwd/external/com_google_absl/absl/functional/internal/any_invocable.h:868:1 (xds_cluster_end2end_test@poller=epoll1+0x4aa14ef)
    #10 grpc_event_engine::experimental::SelfDeletingClosure::Run() /proc/self/cwd/./src/core/lib/event_engine/common_closures.h:54:5 (xds_cluster_end2end_test@poller=epoll1+0x56a78ad)
    #11 grpc_event_engine::experimental::WorkStealingThreadPool::ThreadState::Step() /proc/self/cwd/src/core/lib/event_engine/thread_pool/work_stealing_thread_pool.cc:552:14 (xds_cluster_end2end_test@poller=epoll1+0x56a4663)
    #12 grpc_event_engine::experimental::WorkStealingThreadPool::ThreadState::ThreadBody() /proc/self/cwd/src/core/lib/event_engine/thread_pool/work_stealing_thread_pool.cc:515:10 (xds_cluster_end2end_test@poller=epoll1+0x56a3d47)
    #13 grpc_event_engine::experimental::WorkStealingThreadPool::WorkStealingThreadPoolImpl::StartThread()::$_0::operator()(void*) const /proc/self/cwd/src/core/lib/event_engine/thread_pool/work_stealing_thread_pool.cc:277:17 (xds_cluster_end2end_test@poller=epoll1+0x56a5049)
    #14 grpc_event_engine::experimental::WorkStealingThreadPool::WorkStealingThreadPoolImpl::StartThread()::$_0::__invoke(void*) /proc/self/cwd/src/core/lib/event_engine/thread_pool/work_stealing_thread_pool.cc:275:7 (xds_cluster_end2end_test@poller=epoll1+0x56a4fe9)
    #15 grpc_core::(anonymous namespace)::ThreadInternalsPosix::ThreadInternalsPosix(char const*, void (*)(void*), void*, bool*, grpc_core::Thread::Options const&)::'lambda'(void*)::operator()(void*) const /proc/self/cwd/src/core/util/posix/thd.cc:145:11 (xds_cluster_end2end_test@poller=epoll1+0x58479f0)
    #16 grpc_core::(anonymous namespace)::ThreadInternalsPosix::ThreadInternalsPosix(char const*, void (*)(void*), void*, bool*, grpc_core::Thread::Options const&)::'lambda'(void*)::__invoke(void*) /proc/self/cwd/src/core/util/posix/thd.cc:115:9 (xds_cluster_end2end_test@poller=epoll1+0x5847819)

  Previous write of size 1 at 0x72680002dd80 by thread T4:
    #0 grpc::internal::CallbackBidiHandler::ServerCallbackReaderWriterImpl::Write(envoy::service::load_stats::v3::LoadStatsResponse const*, grpc::WriteOptions) /proc/self/cwd/include/grpcpp/impl/server_callback_handlers.h:790:38 (xds_cluster_end2end_test@poller=epoll1+0x3289e85)
    #1 grpc::ServerBidiReactor::StartWrite(envoy::service::load_stats::v3::LoadStatsResponse const*, grpc::WriteOptions) /proc/self/cwd/include/grpcpp/support/server_callback.h:350:13 (xds_cluster_end2end_test@poller=epoll1+0x32ef139)
    #2 grpc::ServerBidiReactor::StartWrite(envoy::service::load_stats::v3::LoadStatsResponse const*) /proc/self/cwd/include/grpcpp/support/server_callback.h:328:5 (xds_cluster_end2end_test@poller=epoll1+0x32cb6df)
    #3 grpc::testing::LrsServiceImpl::Reactor::OnReadDone(bool) /proc/self/cwd/test/cpp/end2end/xds/xds_server.cc:461:5 (xds_cluster_end2end_test@poller=epoll1+0x32c6535)
    #4 grpc::internal::CallbackBidiHandler::ServerCallbackReaderWriterImpl::SetupReactor(grpc::ServerBidiReactor*)::'lambda0'(bool)::operator()(bool) const /proc/self/cwd/include/grpcpp/impl/server_callback_handlers.h:839:22 (xds_cluster_end2end_test@poller=epoll1+0x328f196)
    #5 std::_Function_handler::ServerCallbackReaderWriterImpl::SetupReactor(grpc::ServerBidiReactor*)::'lambda0'(bool)>::_M_invoke(std::_Any_data const&, bool&&) /usr/lib/gcc/x86_64-linux-gnu/9/../../../../include/c++/9/bits/std_function.h:300:2 (xds_cluster_end2end_test@poller=epoll1+0x328ef1a)
    #6 std::function::operator()(bool) const /usr/lib/gcc/x86_64-linux-gnu/9/../../../../include/c++/9/bits/std_function.h:688:14 (xds_cluster_end2end_test@poller=epoll1+0x3279bb8)
    #7 grpc::internal::CallbackWithSuccessTag::Run(bool)::'lambda'()::operator()() const /proc/self/cwd/include/grpcpp/support/callback_common.h:230:11 (xds_cluster_end2end_test@poller=epoll1+0x3279ae5)
    #8 void std::__invoke_impl(std::__invoke_other, grpc::internal::CallbackWithSuccessTag::Run(bool)::'lambda'() const&) /usr/lib/gcc/x86_64-linux-gnu/9/../../../../include/c++/9/bits/invoke.h:60:14 (xds_cluster_end2end_test@poller=epoll1+0x3279a65)
    #9 std::__invoke_result::type std::__invoke(grpc::internal::CallbackWithSuccessTag::Run(bool)::'lambda'() const&) /usr/lib/gcc/x86_64-linux-gnu/9/../../../../include/c++/9/bits/invoke.h:95:14 (xds_cluster_end2end_test@poller=epoll1+0x3279a15)
    #10 std::invoke_result::type std::invoke(grpc::internal::CallbackWithSuccessTag::Run(bool)::'lambda'() const&) /usr/lib/gcc/x86_64-linux-gnu/9/../../../../include/c++/9/functional:81:14 (xds_cluster_end2end_test@poller=epoll1+0x32799c5)
    #11 void absl::lts_20250127::functional_internal::InvokeObject(absl::lts_20250127::functional_internal::VoidPtr) /proc/self/cwd/external/com_google_absl/absl/functional/internal/function_ref.h:78:7 (xds_cluster_end2end_test@poller=epoll1+0x327996d)
    #12 absl::lts_20250127::FunctionRef::operator()() const /proc/self/cwd/external/com_google_absl/absl/functional/function_ref.h:132:12 (xds_cluster_end2end_test@poller=epoll1+0x3bd0221)
    #13 void grpc::GlobalCallbackHook::CatchingCallback&>(absl::lts_20250127::FunctionRef&) /proc/self/cwd/include/grpcpp/support/global_callback_hook.h:41:5 (xds_cluster_end2end_test@poller=epoll1+0x3bd01a1)
    #14 grpc::DefaultGlobalCallbackHook::RunCallback(grpc_call*, absl::lts_20250127::FunctionRef) /proc/self/cwd/include/grpcpp/support/global_callback_hook.h:50:5 (xds_cluster_end2end_test@poller=epoll1+0x3bd011d)
    #15 grpc::internal::CallbackWithSuccessTag::Run(bool) /proc/self/cwd/include/grpcpp/support/callback_common.h:227:32 (xds_cluster_end2end_test@poller=epoll1+0x3279715)
    #16 grpc::internal::CallbackWithSuccessTag::StaticRun(grpc_completion_queue_functor*, int) /proc/self/cwd/include/grpcpp/support/callback_common.h:212:47 (xds_cluster_end2end_test@poller=epoll1+0x3279329)
    #17 cq_end_op_for_callback(grpc_completion_queue*, void*, absl::lts_20250127::Status, void (*)(void*, grpc_cq_completion*), void*, grpc_cq_completion*, bool)::$_0::operator()() const /proc/self/cwd/src/core/lib/surface/completion_queue.cc:913:9 (xds_cluster_end2end_test@poller=epoll1+0x545e2e3)
    #18 void std::__invoke_impl(std::__invoke_other, cq_end_op_for_callback(grpc_completion_queue*, void*, absl::lts_20250127::Status, void (*)(void*, grpc_cq_completion*), void*, grpc_cq_completion*, bool)::$_0&) /usr/lib/gcc/x86_64-linux-gnu/9/../../../../include/c++/9/bits/invoke.h:60:14 (xds_cluster_end2end_test@poller=epoll1+0x545e205)
    #19 std::__invoke_result::type std::__invoke(cq_end_op_for_callback(grpc_completion_queue*, void*, absl::lts_20250127::Status, void (*)(void*, grpc_cq_completion*), void*, grpc_cq_completion*, bool)::$_0&) /usr/lib/gcc/x86_64-linux-gnu/9/../../../../include/c++/9/bits/invoke.h:95:14 (xds_cluster_end2end_test@poller=epoll1+0x545e1b5)
    #20 std::invoke_result::type std::invoke(cq_end_op_for_callback(grpc_completion_queue*, void*, absl::lts_20250127::Status, void (*)(void*, grpc_cq_completion*), void*, grpc_cq_completion*, bool)::$_0&) /usr/lib/gcc/x86_64-linux-gnu/9/../../../../include/c++/9/functional:81:14 (xds_cluster_end2end_test@poller=epoll1+0x545e165)
    #21 void absl::lts_20250127::internal_any_invocable::InvokeR(cq_end_op_for_callback(grpc_completion_queue*, void*, absl::lts_20250127::Status, void (*)(void*, grpc_cq_completion*), void*, grpc_cq_completion*, bool)::$_0&) /proc/self/cwd/external/com_google_absl/absl/functional/internal/any_invocable.h:132:3 (xds_cluster_end2end_test@poller=epoll1+0x545e115)
    #22 void absl::lts_20250127::internal_any_invocable::RemoteInvoker(absl::lts_20250127::internal_any_invocable::TypeErasedState*) /proc/self/cwd/external/com_google_absl/absl/functional/internal/any_invocable.h:368:10 (xds_cluster_end2end_test@poller=epoll1+0x545df6d)
    #23 absl::lts_20250127::internal_any_invocable::Impl::operator()() /proc/self/cwd/external/com_google_absl/absl/functional/internal/any_invocable.h:868:1 (xds_cluster_end2end_test@poller=epoll1+0x4aa14ef)
    #24 grpc_event_engine::experimental::SelfDeletingClosure::Run() /proc/self/cwd/./src/core/lib/event_engine/common_closures.h:54:5 (xds_cluster_end2end_test@poller=epoll1+0x56a78ad)
    #25 grpc_event_engine::experimental::WorkStealingThreadPool::ThreadState::Step() /proc/self/cwd/src/core/lib/event_engine/thread_pool/work_stealing_thread_pool.cc:552:14 (xds_cluster_end2end_test@poller=epoll1+0x56a4663)
    #26 grpc_event_engine::experimental::WorkStealingThreadPool::ThreadState::ThreadBody() /proc/self/cwd/src/core/lib/event_engine/thread_pool/work_stealing_thread_pool.cc:515:10 (xds_cluster_end2end_test@poller=epoll1+0x56a3d47)
    #27 grpc_event_engine::experimental::WorkStealingThreadPool::WorkStealingThreadPoolImpl::StartThread()::$_0::operator()(void*) const /proc/self/cwd/src/core/lib/event_engine/thread_pool/work_stealing_thread_pool.cc:277:17 (xds_cluster_end2end_test@poller=epoll1+0x56a5049)
    #28 grpc_event_engine::experimental::WorkStealingThreadPool::WorkStealingThreadPoolImpl::StartThread()::$_0::__invoke(void*) /proc/self/cwd/src/core/lib/event_engine/thread_pool/work_stealing_thread_pool.cc:275:7 (xds_cluster_end2end_test@poller=epoll1+0x56a4fe9)
    #29 grpc_core::(anonymous namespace)::ThreadInternalsPosix::ThreadInternalsPosix(char const*, void (*)(void*), void*, bool*, grpc_core::Thread::Options const&)::'lambda'(void*)::operator()(void*) const /proc/self/cwd/src/core/util/posix/thd.cc:145:11 (xds_cluster_end2end_test@poller=epoll1+0x58479f0)
    #30 grpc_core::(anonymous namespace)::ThreadInternalsPosix::ThreadInternalsPosix(char const*, void (*)(void*), void*, bool*, grpc_core::Thread::Options const&)::'lambda'(void*)::__invoke(void*) /proc/self/cwd/src/core/util/posix/thd.cc:115:9 (xds_cluster_end2end_test@poller=epoll1+0x5847819)
```

Closes grpc#39806

COPYBARA_INTEGRATE_REVIEW=grpc#39806 from markdroth:lrs_server_callback_fix e10ed85
PiperOrigin-RevId: 769289887
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant