Skip to content

Commit 028387a

Browse files
authored
tcp_proxy: convert TCP proxy to use TCP connection pool (#3938)
Converts TcpProxy::Filter and WebSocket::WsHandlerImpl to use Tcp::ConnectionPool to obtain connections. Much of the stats handling and connection timeouts are handled by the connection pool. Stats were manually verified by comparing stats produced by the tcp_proxy_integration_test with and without the connection pool change. *Risk Level*: medium *Testing*: unit/integration testing *Docs Changes*: n/a *Release Notes*: n/a Signed-off-by: Stephan Zuercher <stephan@turbinelabs.io>
1 parent f882e74 commit 028387a

17 files changed

Lines changed: 418 additions & 475 deletions

File tree

include/envoy/tcp/conn_pool.h

Lines changed: 11 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,8 @@ class UpstreamCallbacks : public Network::ConnectionCallbacks {
5858
};
5959

6060
/*
61-
* ConnectionData wraps a ClientConnection allocated to a caller.
61+
* ConnectionData wraps a ClientConnection allocated to a caller. Open ClientConnections are
62+
* released back to the pool for re-use when their containing ConnectionData is destroyed.
6263
*/
6364
class ConnectionData {
6465
public:
@@ -76,14 +77,10 @@ class ConnectionData {
7677
* @param callback the UpstreamCallbacks to invoke for upstream data
7778
*/
7879
virtual void addUpstreamCallbacks(ConnectionPool::UpstreamCallbacks& callback) PURE;
79-
80-
/**
81-
* Release the connection after use. The connection should be closed first only if it is
82-
* not viable for future use.
83-
*/
84-
virtual void release() PURE;
8580
};
8681

82+
typedef std::unique_ptr<ConnectionData> ConnectionDataPtr;
83+
8784
/**
8885
* Pool callbacks invoked in the context of a newConnection() call, either synchronously or
8986
* asynchronously.
@@ -102,14 +99,17 @@ class Callbacks {
10299
Upstream::HostDescriptionConstSharedPtr host) PURE;
103100

104101
/**
105-
* Called when a connection is available to process a request/response. Recipients of connections
106-
* must release the connection after use. They should only close the underlying ClientConnection
107-
* if it is no longer viable for future requests.
102+
* Called when a connection is available to process a request/response. Connections may be
103+
* released back to the pool for re-use by resetting the ConnectionDataPtr. If the connection is
104+
* no longer viable for reuse (e.g. due to some kind of protocol error), the underlying
105+
* ClientConnection should be closed to prevent its reuse.
106+
*
108107
* @param conn supplies the connection data to use.
109108
* @param host supplies the description of the host that will carry the request. For logical
110109
* connection pools the description may be different each time this is called.
111110
*/
112-
virtual void onPoolReady(ConnectionData& conn, Upstream::HostDescriptionConstSharedPtr host) PURE;
111+
virtual void onPoolReady(ConnectionDataPtr&& conn,
112+
Upstream::HostDescriptionConstSharedPtr host) PURE;
113113
};
114114

115115
/**

source/common/http/websocket/ws_handler_impl.cc

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -131,7 +131,8 @@ void WsHandlerImpl::onConnectionSuccess() {
131131
// the connection pool. The current approach is a stop gap solution, where
132132
// we put the onus on the user to tell us if a route (and corresponding upstream)
133133
// is supposed to allow websocket upgrades or not.
134-
Http1::ClientConnectionImpl upstream_http(*upstream_connection_, http_conn_callbacks_);
134+
Http1::ClientConnectionImpl upstream_http(upstream_conn_data_->connection(),
135+
http_conn_callbacks_);
135136
Http1::RequestStreamEncoderImpl upstream_request = Http1::RequestStreamEncoderImpl(upstream_http);
136137
upstream_request.encodeHeaders(request_headers_, false);
137138
ASSERT(state_ == ConnectState::PreConnect);

source/common/tcp/conn_pool.cc

Lines changed: 22 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -46,8 +46,10 @@ void ConnPoolImpl::addDrainedCallback(DrainedCb cb) {
4646

4747
void ConnPoolImpl::assignConnection(ActiveConn& conn, ConnectionPool::Callbacks& callbacks) {
4848
ASSERT(conn.wrapper_ == nullptr);
49-
conn.wrapper_ = std::make_unique<ConnectionWrapper>(conn);
50-
callbacks.onPoolReady(*conn.wrapper_, conn.real_host_description_);
49+
conn.wrapper_ = std::make_shared<ConnectionWrapper>(conn);
50+
51+
callbacks.onPoolReady(std::make_unique<ConnectionDataImpl>(conn.wrapper_),
52+
conn.real_host_description_);
5153
}
5254

5355
void ConnPoolImpl::checkForDrained() {
@@ -124,6 +126,8 @@ void ConnPoolImpl::onConnectionEvent(ActiveConn& conn, Network::ConnectionEvent
124126
host_->cluster().stats().upstream_cx_destroy_remote_with_active_rq_.inc();
125127
}
126128
host_->cluster().stats().upstream_cx_destroy_with_active_rq_.inc();
129+
130+
conn.wrapper_->release(true);
127131
}
128132

129133
removed = conn.removeFromList(busy_conns_);
@@ -259,23 +263,29 @@ ConnPoolImpl::ConnectionWrapper::ConnectionWrapper(ActiveConn& parent) : parent_
259263
parent_.parent_.host_->stats().rq_active_.inc();
260264
}
261265

262-
ConnPoolImpl::ConnectionWrapper::~ConnectionWrapper() {
263-
parent_.parent_.host_->cluster().stats().upstream_rq_active_.dec();
264-
parent_.parent_.host_->stats().rq_active_.dec();
266+
Network::ClientConnection& ConnPoolImpl::ConnectionWrapper::connection() {
267+
ASSERT(!released_);
268+
return *parent_.conn_;
265269
}
266270

267-
Network::ClientConnection& ConnPoolImpl::ConnectionWrapper::connection() { return *parent_.conn_; }
268-
269271
void ConnPoolImpl::ConnectionWrapper::addUpstreamCallbacks(ConnectionPool::UpstreamCallbacks& cb) {
270272
ASSERT(!released_);
271273
callbacks_ = &cb;
272274
}
273275

274-
void ConnPoolImpl::ConnectionWrapper::release() {
275-
ASSERT(!released_);
276-
released_ = true;
277-
callbacks_ = nullptr;
278-
parent_.parent_.onConnReleased(parent_);
276+
void ConnPoolImpl::ConnectionWrapper::release(bool closed) {
277+
// Allow multiple calls: connection close and destruction of ConnectionDataImplPtr will both
278+
// result in this call.
279+
if (!released_) {
280+
released_ = true;
281+
callbacks_ = nullptr;
282+
if (!closed) {
283+
parent_.parent_.onConnReleased(parent_);
284+
}
285+
286+
parent_.parent_.host_->cluster().stats().upstream_rq_active_.dec();
287+
parent_.parent_.host_->stats().rq_active_.dec();
288+
}
279289
}
280290

281291
ConnPoolImpl::PendingRequest::PendingRequest(ConnPoolImpl& parent,

source/common/tcp/conn_pool.h

Lines changed: 19 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -34,21 +34,32 @@ class ConnPoolImpl : Logger::Loggable<Logger::Id::pool>, public ConnectionPool::
3434
protected:
3535
struct ActiveConn;
3636

37-
struct ConnectionWrapper : public ConnectionPool::ConnectionData {
37+
struct ConnectionWrapper {
3838
ConnectionWrapper(ActiveConn& parent);
39-
~ConnectionWrapper();
4039

41-
// ConnectionPool::ConnectionData
42-
Network::ClientConnection& connection() override;
43-
void addUpstreamCallbacks(ConnectionPool::UpstreamCallbacks& callbacks) override;
44-
void release() override;
40+
Network::ClientConnection& connection();
41+
void addUpstreamCallbacks(ConnectionPool::UpstreamCallbacks& callbacks);
42+
void release(bool closed);
4543

4644
ActiveConn& parent_;
4745
ConnectionPool::UpstreamCallbacks* callbacks_{};
4846
bool released_{false};
4947
};
5048

51-
typedef std::unique_ptr<ConnectionWrapper> ConnectionWrapperPtr;
49+
typedef std::shared_ptr<ConnectionWrapper> ConnectionWrapperSharedPtr;
50+
51+
struct ConnectionDataImpl : public ConnectionPool::ConnectionData {
52+
ConnectionDataImpl(ConnectionWrapperSharedPtr wrapper) : wrapper_(wrapper) {}
53+
~ConnectionDataImpl() { wrapper_->release(false); }
54+
55+
// ConnectionPool::ConnectionData
56+
Network::ClientConnection& connection() override { return wrapper_->connection(); }
57+
void addUpstreamCallbacks(ConnectionPool::UpstreamCallbacks& callbacks) override {
58+
wrapper_->addUpstreamCallbacks(callbacks);
59+
};
60+
61+
ConnectionWrapperSharedPtr wrapper_;
62+
};
5263

5364
struct ConnReadFilter : public Network::ReadFilterBaseImpl {
5465
ConnReadFilter(ActiveConn& parent) : parent_(parent) {}
@@ -78,7 +89,7 @@ class ConnPoolImpl : Logger::Loggable<Logger::Id::pool>, public ConnectionPool::
7889

7990
ConnPoolImpl& parent_;
8091
Upstream::HostDescriptionConstSharedPtr real_host_description_;
81-
ConnectionWrapperPtr wrapper_;
92+
ConnectionWrapperSharedPtr wrapper_;
8293
Network::ClientConnectionPtr conn_;
8394
Event::TimerPtr connect_timer_;
8495
Stats::TimespanPtr conn_length_;

source/common/tcp_proxy/BUILD

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ envoy_cc_library(
2424
"//include/envoy/stats:stats_interface",
2525
"//include/envoy/stats:stats_macros",
2626
"//include/envoy/stats:timespan",
27+
"//include/envoy/tcp:conn_pool_interface",
2728
"//include/envoy/upstream:cluster_manager_interface",
2829
"//include/envoy/upstream:upstream_interface",
2930
"//source/common/access_log:access_log_lib",

0 commit comments

Comments
 (0)