Skip to content

Commit b7bc539

Browse files
authored
Revert connection pool cleanup (#17319)
This reverts commit 3c266bb. This reverts commit 3876d7c. Signed-off-by: Greg Greenway <ggreenway@apple.com>
1 parent 4b7389a commit b7bc539

45 files changed

Lines changed: 400 additions & 1079 deletions

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

docs/root/intro/arch_overview/other_features/ip_transparency.rst

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,10 @@ conjunction with the
5656
:ref:`Original Src Listener Filter <arch_overview_ip_transparency_original_src_listener>`. Finally,
5757
Envoy supports generating this header using the :ref:`Proxy Protocol Transport Socket <extension_envoy.transport_sockets.upstream_proxy_protocol>`.
5858

59+
IMPORTANT: There is currently a memory `issue <https://github.com/envoyproxy/envoy/issues/16682>`_ in Envoy where upstream connection pools are
60+
not cleaned up after they are created. This heavily affects the usage of this transport socket as new pools are created for every downstream client
61+
IP and port pair. Removing a cluster will clean up its associated connection pools, which could be used to mitigate this issue in the current state.
62+
5963
Here is an example config for setting up the socket:
6064

6165
.. code-block:: yaml

docs/root/version_history/current.rst

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,6 @@ Bug Fixes
4949
*Changes expected to improve the state of the world and are unlikely to have negative effects*
5050

5151
* aws_lambda: if ``payload_passthrough`` is set to ``false``, the downstream response content-type header will now be set from the content-type entry in the JSON response's headers map, if present.
52-
* cluster: delete pools when they're idle to fix unbounded memory use when using PROXY protocol upstream with tcp_proxy. This behavior can be temporarily reverted by setting the ``envoy.reloadable_features.conn_pool_delete_when_idle`` runtime guard to false.
5352
* cluster: fixed the :ref:`cluster stats <config_cluster_manager_cluster_stats_request_response_sizes>` histograms by moving the accounting into the router
5453
filter. This means that we now properly compute the number of bytes sent as well as handling retries which were previously ignored.
5554
* hot_restart: fix double counting of ``server.seconds_until_first_ocsp_response_expiring`` and ``server.days_until_first_cert_expiring`` during hot-restart. This stat was only incorrect until the parent process terminated.

envoy/common/conn_pool.h

Lines changed: 7 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -44,27 +44,17 @@ class Instance {
4444
virtual ~Instance() = default;
4545

4646
/**
47-
* Called when a connection pool has no pending streams, busy connections, or ready connections.
47+
* Called when a connection pool has been drained of pending streams, busy connections, and
48+
* ready connections.
4849
*/
49-
using IdleCb = std::function<void()>;
50+
using DrainedCb = std::function<void()>;
5051

5152
/**
52-
* Register a callback that gets called when the connection pool is fully idle.
53+
* Register a callback that gets called when the connection pool is fully drained and kicks
54+
* off a drain. The owner of the connection pool is responsible for not creating any
55+
* new streams.
5356
*/
54-
virtual void addIdleCallback(IdleCb cb) PURE;
55-
56-
/**
57-
* Returns true if the pool does not have any connections or pending requests.
58-
*/
59-
virtual bool isIdle() const PURE;
60-
61-
/**
62-
* Starts draining a pool, by gracefully completing all requests and gracefully closing all
63-
* connections, in preparation for deletion. When the process completes, the function registered
64-
* via `addIdleCallback()` is called. The callback may occur before this call returns if the pool
65-
* can be immediately drained.
66-
*/
67-
virtual void startDrain() PURE;
57+
virtual void addDrainedCallback(DrainedCb cb) PURE;
6858

6959
/**
7060
* Actively drain all existing connection pool connections. This method can be used in cases

envoy/event/deferred_deletable.h

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -13,12 +13,6 @@ namespace Event {
1313
class DeferredDeletable {
1414
public:
1515
virtual ~DeferredDeletable() = default;
16-
17-
/**
18-
* Called when an object is passed to `deferredDelete`. This signals that the object will soon
19-
* be deleted.
20-
*/
21-
virtual void deleteIsPending() {}
2216
};
2317

2418
using DeferredDeletablePtr = std::unique_ptr<DeferredDeletable>;

envoy/upstream/thread_local_cluster.h

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,9 @@ class HttpPoolData {
3131
/**
3232
* See documentation of Envoy::ConnectionPool::Instance.
3333
*/
34-
void addIdleCallback(ConnectionPool::Instance::IdleCb cb) { pool_->addIdleCallback(cb); };
34+
void addDrainedCallback(ConnectionPool::Instance::DrainedCb cb) {
35+
pool_->addDrainedCallback(cb);
36+
};
3537

3638
Upstream::HostDescriptionConstSharedPtr host() const { return pool_->host(); }
3739

source/common/config/grpc_mux_impl.cc

Lines changed: 0 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -14,35 +14,6 @@
1414
namespace Envoy {
1515
namespace Config {
1616

17-
namespace {
18-
class AllMuxesState {
19-
public:
20-
void insert(GrpcMuxImpl* mux) {
21-
absl::WriterMutexLock locker(&lock_);
22-
muxes_.insert(mux);
23-
}
24-
25-
void erase(GrpcMuxImpl* mux) {
26-
absl::WriterMutexLock locker(&lock_);
27-
muxes_.erase(mux);
28-
}
29-
30-
void shutdownAll() {
31-
absl::WriterMutexLock locker(&lock_);
32-
for (auto& mux : muxes_) {
33-
mux->shutdown();
34-
}
35-
}
36-
37-
private:
38-
absl::flat_hash_set<GrpcMuxImpl*> muxes_ ABSL_GUARDED_BY(lock_);
39-
40-
// TODO(ggreenway): can this lock be removed? Is this code only run on the main thread?
41-
absl::Mutex lock_;
42-
};
43-
using AllMuxes = ThreadSafeSingleton<AllMuxesState>;
44-
} // namespace
45-
4617
GrpcMuxImpl::GrpcMuxImpl(const LocalInfo::LocalInfo& local_info,
4718
Grpc::RawAsyncClientPtr async_client, Event::Dispatcher& dispatcher,
4819
const Protobuf::MethodDescriptor& service_method,
@@ -59,13 +30,8 @@ GrpcMuxImpl::GrpcMuxImpl(const LocalInfo::LocalInfo& local_info,
5930
onDynamicContextUpdate(resource_type_url);
6031
})) {
6132
Config::Utility::checkLocalInfo("ads", local_info);
62-
AllMuxes::get().insert(this);
6333
}
6434

65-
GrpcMuxImpl::~GrpcMuxImpl() { AllMuxes::get().erase(this); }
66-
67-
void GrpcMuxImpl::shutdownAll() { AllMuxes::get().shutdownAll(); }
68-
6935
void GrpcMuxImpl::onDynamicContextUpdate(absl::string_view resource_type_url) {
7036
auto api_state = api_state_.find(resource_type_url);
7137
if (api_state == api_state_.end()) {
@@ -78,10 +44,6 @@ void GrpcMuxImpl::onDynamicContextUpdate(absl::string_view resource_type_url) {
7844
void GrpcMuxImpl::start() { grpc_stream_.establishNewStream(); }
7945

8046
void GrpcMuxImpl::sendDiscoveryRequest(const std::string& type_url) {
81-
if (shutdown_) {
82-
return;
83-
}
84-
8547
ApiState& api_state = apiStateFor(type_url);
8648
auto& request = api_state.request_;
8749
request.mutable_resource_names()->Clear();

source/common/config/grpc_mux_impl.h

Lines changed: 0 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -39,17 +39,6 @@ class GrpcMuxImpl : public GrpcMux,
3939
Random::RandomGenerator& random, Stats::Scope& scope,
4040
const RateLimitSettings& rate_limit_settings, bool skip_subsequent_node);
4141

42-
~GrpcMuxImpl() override;
43-
44-
// Causes all GrpcMuxImpl objects to stop sending any messages on `grpc_stream_` to fix a crash
45-
// on Envoy shutdown due to dangling pointers. This may not be the ideal fix; it is probably
46-
// preferable for the `ServerImpl` to cause all configuration subscriptions to be shutdown, which
47-
// would then cause all `GrpcMuxImpl` to be destructed.
48-
// TODO: figure out the correct fix: https://github.com/envoyproxy/envoy/issues/15072.
49-
static void shutdownAll();
50-
51-
void shutdown() { shutdown_ = true; }
52-
5342
void start() override;
5443

5544
// GrpcMux
@@ -190,10 +179,6 @@ class GrpcMuxImpl : public GrpcMux,
190179

191180
Event::Dispatcher& dispatcher_;
192181
Common::CallbackHandlePtr dynamic_update_callback_handle_;
193-
194-
// True iff Envoy is shutting down; no messages should be sent on the `grpc_stream_` when this is
195-
// true because it may contain dangling pointers.
196-
std::atomic<bool> shutdown_{false};
197182
};
198183

199184
using GrpcMuxImplPtr = std::unique_ptr<GrpcMuxImpl>;

source/common/config/new_grpc_mux_impl.cc

Lines changed: 1 addition & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -16,35 +16,6 @@
1616
namespace Envoy {
1717
namespace Config {
1818

19-
namespace {
20-
class AllMuxesState {
21-
public:
22-
void insert(NewGrpcMuxImpl* mux) {
23-
absl::WriterMutexLock locker(&lock_);
24-
muxes_.insert(mux);
25-
}
26-
27-
void erase(NewGrpcMuxImpl* mux) {
28-
absl::WriterMutexLock locker(&lock_);
29-
muxes_.erase(mux);
30-
}
31-
32-
void shutdownAll() {
33-
absl::WriterMutexLock locker(&lock_);
34-
for (auto& mux : muxes_) {
35-
mux->shutdown();
36-
}
37-
}
38-
39-
private:
40-
absl::flat_hash_set<NewGrpcMuxImpl*> muxes_ ABSL_GUARDED_BY(lock_);
41-
42-
// TODO(ggreenway): can this lock be removed? Is this code only run on the main thread?
43-
absl::Mutex lock_;
44-
};
45-
using AllMuxes = ThreadSafeSingleton<AllMuxesState>;
46-
} // namespace
47-
4819
NewGrpcMuxImpl::NewGrpcMuxImpl(Grpc::RawAsyncClientPtr&& async_client,
4920
Event::Dispatcher& dispatcher,
5021
const Protobuf::MethodDescriptor& service_method,
@@ -59,13 +30,7 @@ NewGrpcMuxImpl::NewGrpcMuxImpl(Grpc::RawAsyncClientPtr&& async_client,
5930
[this](absl::string_view resource_type_url) {
6031
onDynamicContextUpdate(resource_type_url);
6132
})),
62-
transport_api_version_(transport_api_version), dispatcher_(dispatcher) {
63-
AllMuxes::get().insert(this);
64-
}
65-
66-
NewGrpcMuxImpl::~NewGrpcMuxImpl() { AllMuxes::get().erase(this); }
67-
68-
void NewGrpcMuxImpl::shutdownAll() { AllMuxes::get().shutdownAll(); }
33+
transport_api_version_(transport_api_version), dispatcher_(dispatcher) {}
6934

7035
void NewGrpcMuxImpl::onDynamicContextUpdate(absl::string_view resource_type_url) {
7136
auto sub = subscriptions_.find(resource_type_url);
@@ -251,10 +216,6 @@ void NewGrpcMuxImpl::addSubscription(const std::string& type_url, const bool use
251216
}
252217

253218
void NewGrpcMuxImpl::trySendDiscoveryRequests() {
254-
if (shutdown_) {
255-
return;
256-
}
257-
258219
while (true) {
259220
// Do any of our subscriptions even want to send a request?
260221
absl::optional<std::string> maybe_request_type = whoWantsToSendDiscoveryRequest();

source/common/config/new_grpc_mux_impl.h

Lines changed: 0 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -38,17 +38,6 @@ class NewGrpcMuxImpl
3838
const RateLimitSettings& rate_limit_settings,
3939
const LocalInfo::LocalInfo& local_info);
4040

41-
~NewGrpcMuxImpl() override;
42-
43-
// Causes all NewGrpcMuxImpl objects to stop sending any messages on `grpc_stream_` to fix a crash
44-
// on Envoy shutdown due to dangling pointers. This may not be the ideal fix; it is probably
45-
// preferable for the `ServerImpl` to cause all configuration subscriptions to be shutdown, which
46-
// would then cause all `NewGrpcMuxImpl` to be destructed.
47-
// TODO: figure out the correct fix: https://github.com/envoyproxy/envoy/issues/15072.
48-
static void shutdownAll();
49-
50-
void shutdown() { shutdown_ = true; }
51-
5241
GrpcMuxWatchPtr addWatch(const std::string& type_url,
5342
const absl::flat_hash_set<std::string>& resources,
5443
SubscriptionCallbacks& callbacks,
@@ -181,10 +170,6 @@ class NewGrpcMuxImpl
181170
Common::CallbackHandlePtr dynamic_update_callback_handle_;
182171
const envoy::config::core::v3::ApiVersion transport_api_version_;
183172
Event::Dispatcher& dispatcher_;
184-
185-
// True iff Envoy is shutting down; no messages should be sent on the `grpc_stream_` when this is
186-
// true because it may contain dangling pointers.
187-
std::atomic<bool> shutdown_{false};
188173
};
189174

190175
using NewGrpcMuxImplPtr = std::unique_ptr<NewGrpcMuxImpl>;

source/common/conn_pool/conn_pool_base.cc

Lines changed: 20 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -28,13 +28,9 @@ ConnPoolImplBase::ConnPoolImplBase(
2828
upstream_ready_cb_(dispatcher_.createSchedulableCallback([this]() { onUpstreamReady(); })) {}
2929

3030
ConnPoolImplBase::~ConnPoolImplBase() {
31-
ASSERT(isIdleImpl());
32-
ASSERT(connecting_stream_capacity_ == 0);
33-
}
34-
35-
void ConnPoolImplBase::deleteIsPendingImpl() {
36-
deferred_deleting_ = true;
37-
ASSERT(isIdleImpl());
31+
ASSERT(ready_clients_.empty());
32+
ASSERT(busy_clients_.empty());
33+
ASSERT(connecting_clients_.empty());
3834
ASSERT(connecting_stream_capacity_ == 0);
3935
}
4036

@@ -233,8 +229,6 @@ void ConnPoolImplBase::onStreamClosed(Envoy::ConnectionPool::ActiveClient& clien
233229
}
234230

235231
ConnectionPool::Cancellable* ConnPoolImplBase::newStream(AttachContext& context) {
236-
ASSERT(!deferred_deleting_);
237-
238232
ASSERT(static_cast<ssize_t>(connecting_stream_capacity_) ==
239233
connectingCapacity(connecting_clients_)); // O(n) debug check.
240234
if (!ready_clients_.empty()) {
@@ -282,7 +276,6 @@ ConnectionPool::Cancellable* ConnPoolImplBase::newStream(AttachContext& context)
282276
}
283277

284278
bool ConnPoolImplBase::maybePreconnect(float global_preconnect_ratio) {
285-
ASSERT(!deferred_deleting_);
286279
return tryCreateNewConnection(global_preconnect_ratio) == ConnectionResult::CreatedNewConnection;
287280
}
288281

@@ -333,11 +326,9 @@ void ConnPoolImplBase::transitionActiveClientState(ActiveClient& client,
333326
}
334327
}
335328

336-
void ConnPoolImplBase::addIdleCallbackImpl(Instance::IdleCb cb) { idle_callbacks_.push_back(cb); }
337-
338-
void ConnPoolImplBase::startDrainImpl() {
339-
is_draining_ = true;
340-
checkForIdleAndCloseIdleConnsIfDraining();
329+
void ConnPoolImplBase::addDrainedCallbackImpl(Instance::DrainedCb cb) {
330+
drained_callbacks_.push_back(cb);
331+
checkForDrained();
341332
}
342333

343334
void ConnPoolImplBase::closeIdleConnectionsForDrainingPool() {
@@ -379,19 +370,17 @@ void ConnPoolImplBase::drainConnectionsImpl() {
379370
}
380371
}
381372

382-
bool ConnPoolImplBase::isIdleImpl() const {
383-
return pending_streams_.empty() && ready_clients_.empty() && busy_clients_.empty() &&
384-
connecting_clients_.empty();
385-
}
386-
387-
void ConnPoolImplBase::checkForIdleAndCloseIdleConnsIfDraining() {
388-
if (is_draining_) {
389-
closeIdleConnectionsForDrainingPool();
373+
void ConnPoolImplBase::checkForDrained() {
374+
if (drained_callbacks_.empty()) {
375+
return;
390376
}
391377

392-
if (isIdleImpl()) {
393-
ENVOY_LOG(debug, "invoking idle callbacks - is_draining_={}", is_draining_);
394-
for (const Instance::IdleCb& cb : idle_callbacks_) {
378+
closeIdleConnectionsForDrainingPool();
379+
380+
if (pending_streams_.empty() && ready_clients_.empty() && busy_clients_.empty() &&
381+
connecting_clients_.empty()) {
382+
ENVOY_LOG(debug, "invoking drained callbacks");
383+
for (const Instance::DrainedCb& cb : drained_callbacks_) {
395384
cb();
396385
}
397386
}
@@ -454,8 +443,9 @@ void ConnPoolImplBase::onConnectionEvent(ActiveClient& client, absl::string_view
454443
client.releaseResources();
455444

456445
dispatcher_.deferredDelete(client.removeFromList(owningList(client.state())));
457-
458-
checkForIdleAndCloseIdleConnsIfDraining();
446+
if (incomplete_stream) {
447+
checkForDrained();
448+
}
459449

460450
client.setState(ActiveClient::State::CLOSED);
461451

@@ -473,7 +463,7 @@ void ConnPoolImplBase::onConnectionEvent(ActiveClient& client, absl::string_view
473463
// refer to client after this point.
474464
onConnected(client);
475465
onUpstreamReady();
476-
checkForIdleAndCloseIdleConnsIfDraining();
466+
checkForDrained();
477467
}
478468
}
479469

@@ -543,7 +533,7 @@ void ConnPoolImplBase::onPendingStreamCancel(PendingStream& stream,
543533
}
544534

545535
host_->cluster().stats().upstream_rq_cancelled_.inc();
546-
checkForIdleAndCloseIdleConnsIfDraining();
536+
checkForDrained();
547537
}
548538

549539
namespace {

0 commit comments

Comments
 (0)