From fc23b06d29c3b5f0c8c01813b488f4cec07aa148 Mon Sep 17 00:00:00 2001 From: Sergey Pershin Date: Fri, 10 Apr 2026 20:55:50 -0700 Subject: [PATCH] fix(native): Fix worker shutdown sequence (#27566) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Summary: We observed SIGSEGV crashes during the shutdown in one of the Regions. According to the AI analysis: The sequence during prolonged shutdown: 1. Server receives preemption signal, starts graceful shutdown 2. Tasks drain slowly over hours 3. Eventually shutdownServer() → joinExecutors() is called 4. driverExecutor_.reset() destroys the MonitoredExecutor (line 789) 5. But exchangeHttpCpuExecutor_ threads are still alive and processing exchange HTTP responses 6. A response callback calls requestPromise.setValue() → dispatches to the now-destroyed MonitoredExecutor 7. SIGSEGV The fix is to ensure all the threads are done in exchangeHttpCpuExecutor_. ``` == NO RELEASE NOTE == ``` Reviewed By: han-yan01 Differential Revision: D100429469 --- .../presto_cpp/main/PrestoServer.cpp | 26 ++- .../presto_cpp/main/tests/CMakeLists.txt | 1 + .../main/tests/ShutdownOrderTest.cpp | 174 ++++++++++++++++++ 3 files changed, 191 insertions(+), 10 deletions(-) create mode 100644 presto-native-execution/presto_cpp/main/tests/ShutdownOrderTest.cpp diff --git a/presto-native-execution/presto_cpp/main/PrestoServer.cpp b/presto-native-execution/presto_cpp/main/PrestoServer.cpp index e9780d36f83ad..b80cc7bcbb1b3 100644 --- a/presto-native-execution/presto_cpp/main/PrestoServer.cpp +++ b/presto-native-execution/presto_cpp/main/PrestoServer.cpp @@ -781,15 +781,31 @@ void PrestoServer::stopAnnouncer() { } void PrestoServer::joinExecutors() { + // Join exchange HTTP CPU executor first. Exchange CPU threads run + // PrestoExchangeSource::handleDataResponse which dispatches callbacks to + // driverExecutor_ (MonitoredExecutor) via ExchangeClient. We must drain + // these threads before destroying driverExecutor_ to avoid use-after-free. + PRESTO_SHUTDOWN_LOG(INFO) + << "Joining Exchange Http CPU executor '" + << exchangeHttpCpuExecutor_->getName() + << "': threads: " << exchangeHttpCpuExecutor_->numActiveThreads() << "/" + << exchangeHttpCpuExecutor_->numThreads(); + exchangeHttpCpuExecutor_->join(); + PRESTO_SHUTDOWN_LOG(INFO) << "Joining Driver CPU Executor '" << driverCpuExecutor_->getName() << "': threads: " << driverCpuExecutor_->numActiveThreads() << "/" << driverCpuExecutor_->numThreads() << ", task queue: " << driverCpuExecutor_->getTaskQueueSize(); + driverCpuExecutor_->join(); // Schedule release of SessionPools held by HttpClients before the exchange // HTTP IO executor threads are joined. driverExecutor_.reset(); + // Release exchange CPU executor resources after driverExecutor_ is reset, + // before exchange IO threads are joined. + exchangeHttpCpuExecutor_.reset(); + if (connectorCpuExecutor_) { PRESTO_SHUTDOWN_LOG(INFO) << "Joining Connector CPU Executor '" @@ -824,16 +840,6 @@ void PrestoServer::joinExecutors() { httpSrvIoExecutor_->join(); } - PRESTO_SHUTDOWN_LOG(INFO) - << "Joining Exchange Http CPU executor '" - << exchangeHttpCpuExecutor_->getName() - << "': threads: " << exchangeHttpCpuExecutor_->numActiveThreads() << "/" - << exchangeHttpCpuExecutor_->numThreads(); - exchangeHttpCpuExecutor_->join(); - // Schedule release of SessionPools held by HttpClients before the exchange - // HTTP IO executor threads are joined. - exchangeHttpCpuExecutor_.reset(); - if (exchangeSourceConnectionPool_) { // Connection pool needs to be destroyed after CPU threads are joined but // before IO threads are joined. diff --git a/presto-native-execution/presto_cpp/main/tests/CMakeLists.txt b/presto-native-execution/presto_cpp/main/tests/CMakeLists.txt index b31ce9cf017be..40e539d990bf5 100644 --- a/presto-native-execution/presto_cpp/main/tests/CMakeLists.txt +++ b/presto-native-execution/presto_cpp/main/tests/CMakeLists.txt @@ -21,6 +21,7 @@ add_executable( PrestoToVeloxQueryConfigTest.cpp QueryContextCacheTest.cpp ServerOperationTest.cpp + ShutdownOrderTest.cpp TaskManagerTest.cpp QueryContextManagerTest.cpp TaskInfoTest.cpp diff --git a/presto-native-execution/presto_cpp/main/tests/ShutdownOrderTest.cpp b/presto-native-execution/presto_cpp/main/tests/ShutdownOrderTest.cpp new file mode 100644 index 0000000000000..d3a491d6c443c --- /dev/null +++ b/presto-native-execution/presto_cpp/main/tests/ShutdownOrderTest.cpp @@ -0,0 +1,174 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include +#include +#include +#include +#include + +namespace facebook::presto { +namespace { + +// Wraps another executor without overriding keepAliveAcquire/Release. +// This mimics MonitoredExecutor's behavior: folly futures holding a raw pointer +// via ExecutorKeepAlive cannot prevent this executor from being destroyed. +// When destroyed, the wrapper's memory is freed even if outstanding futures +// still reference it through raw pointers. +class NoKeepAliveExecutor : public folly::Executor { + public: + explicit NoKeepAliveExecutor(std::unique_ptr base) + : base_(std::move(base)) {} + + void add(folly::Func func) override { + base_->add(std::move(func)); + } + + private: + std::unique_ptr base_; +}; + +// Shared synchronization state between the test thread and async lambdas. +struct SyncState { + folly::EventCount exchangeReadyEvent; + std::atomic exchangeReadyFlag{false}; + folly::EventCount proceedEvent; + std::atomic proceedFlag{false}; + std::atomic completed{false}; +}; + +} // namespace + +// Verifies that the shutdown order in PrestoServer::joinExecutors() correctly +// drains exchangeHttpCpuExecutor_ threads before destroying driverExecutor_. +// +// Production scenario: +// - PrestoExchangeSource::handleDataResponse runs on exchangeHttpCpuExecutor_ +// - It calls requestPromise.setValue() which dispatches the ExchangeClient +// callback to driverExecutor_ (a MonitoredExecutor) via raw pointer +// - If driverExecutor_ is already destroyed, this is use-after-free +// +// This test forces the exact interleaving: +// 1. Exchange thread starts and signals it's ready +// 2. Test releases the exchange thread +// 3. Exchange thread dispatches to the driver executor via raw pointer +// 4. Shutdown drains exchange threads BEFORE destroying driver executor +// +// With the old shutdown order (driverExecutor_.reset() before +// exchangeHttpCpuExecutor_->join()), ASAN detects heap-use-after-free +// because the exchange callback calls add() on freed executor memory. +TEST(ShutdownOrderTest, exchangeCallbacksDrainBeforeDriverExecutorDestroyed) { + // Create driver executor wrapped in NoKeepAliveExecutor to match + // MonitoredExecutor's lack of keepalive token support. + auto driverExecutor = std::make_unique( + std::make_unique( + 2, std::make_shared("TestDriver"))); + + // Separate exchange CPU executor (like exchangeHttpCpuExecutor_ in prod). + auto exchangeCpu = std::make_unique( + 2, std::make_shared("TestExchangeCPU")); + + // Raw pointer, same as ExchangeClient::executor_ and the pointer stored + // inside folly future cores by .via(executor_). + auto* driverRawPtr = driverExecutor.get(); + + auto sync = std::make_shared(); + + // Simulate an in-flight exchange HTTP response callback running on + // exchangeCpuExecutor that dispatches work to driverExecutor. + // In production: handleDataResponse -> requestPromise.setValue() -> + // Core::doCallback -> MonitoredExecutor::add() + exchangeCpu->add([sync, driverRawPtr]() { + sync->exchangeReadyFlag = true; + sync->exchangeReadyEvent.notifyAll(); + sync->proceedEvent.await([sync]() { return sync->proceedFlag.load(); }); + + // This is the critical call that crashes with use-after-free if + // driverExecutor has been destroyed. + driverRawPtr->add([]() {}); + sync->completed = true; + }); + + sync->exchangeReadyEvent.await( + [sync]() { return sync->exchangeReadyFlag.load(); }); + sync->proceedFlag = true; + sync->proceedEvent.notifyAll(); + + // CORRECT shutdown order (the fix in joinExecutors()): + // 1. Join exchange CPU — drains all callbacks that dispatch to + // driverExecutor. + exchangeCpu->join(); + + // 2. Now safe to destroy driverExecutor — no outstanding raw pointer users. + // With the OLD ordering (steps 1 and 2 swapped), driverExecutor would be + // freed while the exchange thread still holds driverRawPtr, and the + // driverRawPtr->add() call above would be heap-use-after-free. + driverExecutor.reset(); + + exchangeCpu.reset(); + + EXPECT_TRUE(sync->completed); +} + +// Same scenario but using folly Promise/SemiFuture with .via(executor) to +// match the exact production code path in ExchangeClient::request(). +TEST(ShutdownOrderTest, promiseChainDispatchesSafelyDuringShutdown) { + auto driverExecutor = std::make_unique( + std::make_unique( + 2, std::make_shared("TestDriver"))); + auto exchangeCpu = std::make_unique( + 2, std::make_shared("TestExchangeCPU")); + + auto* driverRawPtr = driverExecutor.get(); + + // Set up a promise/future chain matching the production pattern: + // source->request() returns SemiFuture + // ExchangeClient does: future.via(driverExecutor).thenValue(callback) + // PrestoExchangeSource does: promise.setValue() on exchange CPU thread + auto requestPromise = std::make_shared>(); + auto clientCallbackRan = std::make_shared>(false); + auto clientFuture = requestPromise->getSemiFuture() + .via(driverRawPtr) + .thenValue([clientCallbackRan](int value) { + *clientCallbackRan = true; + }); + + auto sync = std::make_shared(); + + // Simulate handleDataResponse on exchange CPU thread fulfilling the promise. + exchangeCpu->add([sync, requestPromise]() { + sync->exchangeReadyFlag = true; + sync->exchangeReadyEvent.notifyAll(); + sync->proceedEvent.await([sync]() { return sync->proceedFlag.load(); }); + // This triggers Core::doCallback which calls driverRawPtr->add(). + // If driverRawPtr points to freed memory, this is use-after-free. + requestPromise->setValue(42); + }); + + sync->exchangeReadyEvent.await( + [sync]() { return sync->exchangeReadyFlag.load(); }); + sync->proceedFlag = true; + sync->proceedEvent.notifyAll(); + + // Correct shutdown order. + exchangeCpu->join(); + driverExecutor.reset(); + + // The client callback may or may not have run (the driver executor's + // internal thread pool was destroyed), but the critical invariant is + // that we didn't crash. Under the old ordering, the promise.setValue() + // call would dispatch to a freed executor. +} + +} // namespace facebook::presto