Skip to content

Commit 114d5ae

Browse files
authored
cluster: destroy on main thread (#14954)
Signed-off-by: Yuchen Dai <silentdai@gmail.com>
1 parent a50b1c0 commit 114d5ae

22 files changed

Lines changed: 396 additions & 13 deletions

include/envoy/event/BUILD

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,11 +13,17 @@ envoy_cc_library(
1313
hdrs = ["deferred_deletable.h"],
1414
)
1515

16+
envoy_cc_library(
17+
name = "dispatcher_thread_deletable",
18+
hdrs = ["dispatcher_thread_deletable.h"],
19+
)
20+
1621
envoy_cc_library(
1722
name = "dispatcher_interface",
1823
hdrs = ["dispatcher.h"],
1924
deps = [
2025
":deferred_deletable",
26+
":dispatcher_thread_deletable",
2127
":file_event_interface",
2228
":scaled_timer",
2329
":schedulable_cb_interface",

include/envoy/event/dispatcher.h

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88

99
#include "envoy/common/scope_tracker.h"
1010
#include "envoy/common/time.h"
11+
#include "envoy/event/dispatcher_thread_deletable.h"
1112
#include "envoy/event/file_event.h"
1213
#include "envoy/event/scaled_timer.h"
1314
#include "envoy/event/schedulable_cb.h"
@@ -260,6 +261,12 @@ class Dispatcher : public DispatcherBase {
260261
*/
261262
virtual void post(PostCb callback) PURE;
262263

264+
/**
265+
* Post the deletable to this dispatcher. The deletable objects are guaranteed to be destroyed on
266+
* the dispatcher's thread before dispatcher destroy. This is safe cross thread.
267+
*/
268+
virtual void deleteInDispatcherThread(DispatcherThreadDeletableConstPtr deletable) PURE;
269+
263270
/**
264271
* Runs the event loop. This will not return until exit() is called either from within a callback
265272
* or from a different thread.
@@ -287,6 +294,11 @@ class Dispatcher : public DispatcherBase {
287294
* Updates approximate monotonic time to current value.
288295
*/
289296
virtual void updateApproximateMonotonicTime() PURE;
297+
298+
/**
299+
* Shutdown the dispatcher by clear dispatcher thread deletable.
300+
*/
301+
virtual void shutdown() PURE;
290302
};
291303

292304
using DispatcherPtr = std::unique_ptr<Dispatcher>;
Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
#pragma once
2+
3+
#include <memory>
4+
5+
namespace Envoy {
6+
namespace Event {
7+
8+
/**
9+
* If an object derives from this class, it can be passed to the destination dispatcher who
10+
* guarantees to delete it in that dispatcher thread. The common use case is to ensure config
11+
* related objects are deleted in the main thread.
12+
*/
13+
class DispatcherThreadDeletable {
14+
public:
15+
virtual ~DispatcherThreadDeletable() = default;
16+
};
17+
18+
using DispatcherThreadDeletableConstPtr = std::unique_ptr<const DispatcherThreadDeletable>;
19+
20+
} // namespace Event
21+
} // namespace Envoy

source/common/event/dispatcher_impl.cc

Lines changed: 67 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,8 @@ DispatcherImpl::DispatcherImpl(const std::string& name, Api::Api& api,
6363
? watermark_factory
6464
: std::make_shared<Buffer::WatermarkBufferFactory>()),
6565
scheduler_(time_system.createScheduler(base_scheduler_, base_scheduler_)),
66+
thread_local_delete_cb_(
67+
base_scheduler_.createSchedulableCallback([this]() -> void { runThreadLocalDelete(); })),
6668
deferred_delete_cb_(base_scheduler_.createSchedulableCallback(
6769
[this]() -> void { clearDeferredDeleteList(); })),
6870
post_cb_(base_scheduler_.createSchedulableCallback([this]() -> void { runPostCallbacks(); })),
@@ -74,7 +76,12 @@ DispatcherImpl::DispatcherImpl(const std::string& name, Api::Api& api,
7476
std::bind(&DispatcherImpl::updateApproximateMonotonicTime, this));
7577
}
7678

77-
DispatcherImpl::~DispatcherImpl() { FatalErrorHandler::removeFatalErrorHandler(*this); }
79+
DispatcherImpl::~DispatcherImpl() {
80+
ENVOY_LOG(debug, "destroying dispatcher {}", name_);
81+
FatalErrorHandler::removeFatalErrorHandler(*this);
82+
// TODO(lambdai): Resolve https://github.com/envoyproxy/envoy/issues/15072 and enable
83+
// ASSERT(deletable_in_dispatcher_thread_.empty())
84+
}
7885

7986
void DispatcherImpl::registerWatchdog(const Server::WatchDogSharedPtr& watchdog,
8087
std::chrono::milliseconds min_touch_interval) {
@@ -265,9 +272,23 @@ void DispatcherImpl::post(std::function<void()> callback) {
265272
}
266273
}
267274

275+
void DispatcherImpl::deleteInDispatcherThread(DispatcherThreadDeletableConstPtr deletable) {
276+
bool need_schedule;
277+
{
278+
Thread::LockGuard lock(thread_local_deletable_lock_);
279+
need_schedule = deletables_in_dispatcher_thread_.empty();
280+
deletables_in_dispatcher_thread_.emplace_back(std::move(deletable));
281+
// TODO(lambdai): Enable below after https://github.com/envoyproxy/envoy/issues/15072
282+
// ASSERT(!shutdown_called_, "inserted after shutdown");
283+
}
284+
285+
if (need_schedule) {
286+
thread_local_delete_cb_->scheduleCallbackCurrentIteration();
287+
}
288+
}
289+
268290
void DispatcherImpl::run(RunType type) {
269291
run_tid_ = api_.threadFactory().currentThreadId();
270-
271292
// Flush all post callbacks before we run the event loop. We do this because there are post
272293
// callbacks that have to get run before the initial event loop starts running. libevent does
273294
// not guarantee that events are run in any particular order. So even if we post() and call
@@ -280,12 +301,56 @@ MonotonicTime DispatcherImpl::approximateMonotonicTime() const {
280301
return approximate_monotonic_time_;
281302
}
282303

304+
void DispatcherImpl::shutdown() {
305+
// TODO(lambdai): Resolve https://github.com/envoyproxy/envoy/issues/15072 and loop delete below
306+
// below 3 lists until all lists are empty. The 3 lists are list of deferred delete objects, post
307+
// callbacks and dispatcher thread deletable objects.
308+
ASSERT(isThreadSafe());
309+
auto deferred_deletables_size = current_to_delete_->size();
310+
std::list<std::function<void()>>::size_type post_callbacks_size;
311+
{
312+
Thread::LockGuard lock(post_lock_);
313+
post_callbacks_size = post_callbacks_.size();
314+
}
315+
316+
std::list<DispatcherThreadDeletableConstPtr> local_deletables;
317+
{
318+
Thread::LockGuard lock(thread_local_deletable_lock_);
319+
local_deletables = std::move(deletables_in_dispatcher_thread_);
320+
}
321+
auto thread_local_deletables_size = local_deletables.size();
322+
while (!local_deletables.empty()) {
323+
local_deletables.pop_front();
324+
}
325+
ASSERT(!shutdown_called_);
326+
shutdown_called_ = true;
327+
ENVOY_LOG(
328+
trace,
329+
"{} destroyed {} thread local objects. Peek {} deferred deletables, {} post callbacks. ",
330+
__FUNCTION__, deferred_deletables_size, post_callbacks_size, thread_local_deletables_size);
331+
}
332+
283333
void DispatcherImpl::updateApproximateMonotonicTime() { updateApproximateMonotonicTimeInternal(); }
284334

285335
void DispatcherImpl::updateApproximateMonotonicTimeInternal() {
286336
approximate_monotonic_time_ = api_.timeSource().monotonicTime();
287337
}
288338

339+
void DispatcherImpl::runThreadLocalDelete() {
340+
std::list<DispatcherThreadDeletableConstPtr> to_be_delete;
341+
{
342+
Thread::LockGuard lock(thread_local_deletable_lock_);
343+
to_be_delete = std::move(deletables_in_dispatcher_thread_);
344+
ASSERT(deletables_in_dispatcher_thread_.empty());
345+
}
346+
while (!to_be_delete.empty()) {
347+
// Touch the watchdog before deleting the objects to avoid spurious watchdog miss events when
348+
// executing complicated destruction.
349+
touchWatchdog();
350+
// Delete in FIFO order.
351+
to_be_delete.pop_front();
352+
}
353+
}
289354
void DispatcherImpl::runPostCallbacks() {
290355
// Clear the deferred delete list before running post callbacks to reduce non-determinism in
291356
// callback processing, and more easily detect if a scheduled post callback refers to one of the

source/common/event/dispatcher_impl.h

Lines changed: 17 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -86,12 +86,14 @@ class DispatcherImpl : Logger::Loggable<Logger::Id::main>,
8686
void exit() override;
8787
SignalEventPtr listenForSignal(signal_t signal_num, SignalCb cb) override;
8888
void post(std::function<void()> callback) override;
89+
void deleteInDispatcherThread(DispatcherThreadDeletableConstPtr deletable) override;
8990
void run(RunType type) override;
9091
Buffer::WatermarkFactory& getWatermarkFactory() override { return *buffer_factory_; }
9192
void pushTrackedObject(const ScopeTrackedObject* object) override;
9293
void popTrackedObject(const ScopeTrackedObject* expected_object) override;
9394
MonotonicTime approximateMonotonicTime() const override;
9495
void updateApproximateMonotonicTime() override;
96+
void shutdown() override;
9597

9698
// FatalErrorInterface
9799
void onFatalError(std::ostream& os) const override;
@@ -127,6 +129,8 @@ class DispatcherImpl : Logger::Loggable<Logger::Id::main>,
127129
TimerPtr createTimerInternal(TimerCb cb);
128130
void updateApproximateMonotonicTimeInternal();
129131
void runPostCallbacks();
132+
void runThreadLocalDelete();
133+
130134
// Helper used to touch the watchdog after most schedulable, fd, and timer callbacks.
131135
void touchWatchdog();
132136

@@ -145,13 +149,24 @@ class DispatcherImpl : Logger::Loggable<Logger::Id::main>,
145149
Buffer::WatermarkFactorySharedPtr buffer_factory_;
146150
LibeventScheduler base_scheduler_;
147151
SchedulerPtr scheduler_;
152+
153+
SchedulableCallbackPtr thread_local_delete_cb_;
154+
Thread::MutexBasicLockable thread_local_deletable_lock_;
155+
// `deletables_in_dispatcher_thread` must be destroyed last to allow other callbacks populate.
156+
std::list<DispatcherThreadDeletableConstPtr>
157+
deletables_in_dispatcher_thread_ ABSL_GUARDED_BY(thread_local_deletable_lock_);
158+
bool shutdown_called_{false};
159+
148160
SchedulableCallbackPtr deferred_delete_cb_;
161+
149162
SchedulableCallbackPtr post_cb_;
163+
Thread::MutexBasicLockable post_lock_;
164+
std::list<std::function<void()>> post_callbacks_ ABSL_GUARDED_BY(post_lock_);
165+
150166
std::vector<DeferredDeletablePtr> to_delete_1_;
151167
std::vector<DeferredDeletablePtr> to_delete_2_;
152168
std::vector<DeferredDeletablePtr>* current_to_delete_;
153-
Thread::MutexBasicLockable post_lock_;
154-
std::list<std::function<void()>> post_callbacks_ ABSL_GUARDED_BY(post_lock_);
169+
155170
absl::InlinedVector<const ScopeTrackedObject*, ExpectedMaxTrackedObjectStackDepth>
156171
tracked_object_stack_;
157172
bool deferred_deleting_{};

source/common/grpc/async_client_impl.cc

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -210,6 +210,7 @@ void AsyncStreamImpl::cleanup() {
210210
// This will destroy us, but only do so if we are actually in a list. This does not happen in
211211
// the immediate failure case.
212212
if (LinkedObject<AsyncStreamImpl>::inserted()) {
213+
ASSERT(dispatcher_->isThreadSafe());
213214
dispatcher_->deferredDelete(
214215
LinkedObject<AsyncStreamImpl>::removeFromList(parent_.active_streams_));
215216
}

source/common/http/async_client_impl.cc

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -149,6 +149,7 @@ void AsyncStreamImpl::sendHeaders(RequestHeaderMap& headers, bool end_stream) {
149149
}
150150

151151
void AsyncStreamImpl::sendData(Buffer::Instance& data, bool end_stream) {
152+
ASSERT(dispatcher().isThreadSafe());
152153
// Map send calls after local closure to no-ops. The send call could have been queued prior to
153154
// remote reset or closure, and/or closure could have occurred synchronously in response to a
154155
// previous send. In these cases the router will have already cleaned up stream state. This
@@ -169,6 +170,7 @@ void AsyncStreamImpl::sendData(Buffer::Instance& data, bool end_stream) {
169170
}
170171

171172
void AsyncStreamImpl::sendTrailers(RequestTrailerMap& trailers) {
173+
ASSERT(dispatcher().isThreadSafe());
172174
// See explanation in sendData.
173175
if (local_closed_) {
174176
return;
@@ -226,6 +228,7 @@ void AsyncStreamImpl::reset() {
226228
}
227229

228230
void AsyncStreamImpl::cleanup() {
231+
ASSERT(dispatcher().isThreadSafe());
229232
local_closed_ = remote_closed_ = true;
230233
// This will destroy us, but only do so if we are actually in a list. This does not happen in
231234
// the immediate failure case.

source/common/network/connection_impl.cc

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -211,6 +211,7 @@ Connection::State ConnectionImpl::state() const {
211211
void ConnectionImpl::closeConnectionImmediately() { closeSocket(ConnectionEvent::LocalClose); }
212212

213213
void ConnectionImpl::setTransportSocketIsReadable() {
214+
ASSERT(dispatcher_.isThreadSafe());
214215
// Remember that the transport requested read resumption, in case the resumption event is not
215216
// scheduled immediately or is "lost" because read was disabled.
216217
transport_wants_read_ = true;
@@ -301,6 +302,7 @@ void ConnectionImpl::noDelay(bool enable) {
301302
}
302303

303304
void ConnectionImpl::onRead(uint64_t read_buffer_size) {
305+
ASSERT(dispatcher_.isThreadSafe());
304306
if (inDelayedClose() || !filterChainWantsData()) {
305307
return;
306308
}
@@ -420,6 +422,7 @@ void ConnectionImpl::raiseEvent(ConnectionEvent event) {
420422
bool ConnectionImpl::readEnabled() const {
421423
// Calls to readEnabled on a closed socket are considered to be an error.
422424
ASSERT(state() == State::Open);
425+
ASSERT(dispatcher_.isThreadSafe());
423426
return read_disable_count_ == 0;
424427
}
425428

@@ -437,6 +440,7 @@ void ConnectionImpl::write(Buffer::Instance& data, bool end_stream) {
437440

438441
void ConnectionImpl::write(Buffer::Instance& data, bool end_stream, bool through_filter_chain) {
439442
ASSERT(!end_stream || enable_half_close_);
443+
ASSERT(dispatcher_.isThreadSafe());
440444

441445
if (write_end_stream_) {
442446
// It is an API violation to write more data after writing end_stream, but a duplicate

source/common/upstream/upstream_impl.cc

Lines changed: 13 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -926,9 +926,16 @@ ClusterImplBase::ClusterImplBase(
926926

927927
auto socket_matcher = std::make_unique<TransportSocketMatcherImpl>(
928928
cluster.transport_socket_matches(), factory_context, socket_factory, *stats_scope);
929-
info_ = std::make_unique<ClusterInfoImpl>(cluster, factory_context.clusterManager().bindConfig(),
930-
runtime, std::move(socket_matcher),
931-
std::move(stats_scope), added_via_api, factory_context);
929+
auto& dispatcher = factory_context.dispatcher();
930+
info_ = std::shared_ptr<const ClusterInfoImpl>(
931+
new ClusterInfoImpl(cluster, factory_context.clusterManager().bindConfig(), runtime,
932+
std::move(socket_matcher), std::move(stats_scope), added_via_api,
933+
factory_context),
934+
[&dispatcher](const ClusterInfoImpl* self) {
935+
ENVOY_LOG(trace, "Schedule destroy cluster info {}", self->name());
936+
dispatcher.deleteInDispatcherThread(
937+
std::unique_ptr<const Event::DispatcherThreadDeletable>(self));
938+
});
932939

933940
if ((info_->features() & ClusterInfoImpl::Features::USE_ALPN) &&
934941
!raw_factory_pointer->supportsAlpn()) {
@@ -1120,7 +1127,7 @@ void ClusterImplBase::reloadHealthyHostsHelper(const HostSharedPtr&) {
11201127
for (size_t priority = 0; priority < host_sets.size(); ++priority) {
11211128
const auto& host_set = host_sets[priority];
11221129
// TODO(htuch): Can we skip these copies by exporting out const shared_ptr from HostSet?
1123-
HostVectorConstSharedPtr hosts_copy(new HostVector(host_set->hosts()));
1130+
HostVectorConstSharedPtr hosts_copy = std::make_shared<HostVector>(host_set->hosts());
11241131

11251132
HostsPerLocalityConstSharedPtr hosts_per_locality_copy = host_set->hostsPerLocality().clone();
11261133
prioritySet().updateHosts(priority,
@@ -1311,10 +1318,10 @@ void PriorityStateManager::registerHostForPriority(
13111318
auto metadata = lb_endpoint.has_metadata()
13121319
? parent_.constMetadataSharedPool()->getObject(lb_endpoint.metadata())
13131320
: nullptr;
1314-
const HostSharedPtr host(new HostImpl(
1321+
const auto host = std::make_shared<HostImpl>(
13151322
parent_.info(), hostname, address, metadata, lb_endpoint.load_balancing_weight().value(),
13161323
locality_lb_endpoint.locality(), lb_endpoint.endpoint().health_check_config(),
1317-
locality_lb_endpoint.priority(), lb_endpoint.health_status(), time_source));
1324+
locality_lb_endpoint.priority(), lb_endpoint.health_status(), time_source);
13181325
registerHostForPriority(host, locality_lb_endpoint);
13191326
}
13201327

source/common/upstream/upstream_impl.h

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -509,7 +509,9 @@ class PrioritySetImpl : public PrioritySet {
509509
/**
510510
* Implementation of ClusterInfo that reads from JSON.
511511
*/
512-
class ClusterInfoImpl : public ClusterInfo, protected Logger::Loggable<Logger::Id::upstream> {
512+
class ClusterInfoImpl : public ClusterInfo,
513+
public Event::DispatcherThreadDeletable,
514+
protected Logger::Loggable<Logger::Id::upstream> {
513515
public:
514516
using HttpProtocolOptionsConfigImpl =
515517
Envoy::Extensions::Upstreams::Http::ProtocolOptionsConfigImpl;

0 commit comments

Comments
 (0)