Skip to content

Commit 23e5fc2

Browse files
authored
udp_proxy: added per packet load balancing possibility (#18605)
Signed-off-by: Michal Maka <m.maka@partner.samsung.com>
1 parent dc2ac22 commit 23e5fc2

File tree

7 files changed

+377
-62
lines changed

7 files changed

+377
-62
lines changed

api/envoy/extensions/filters/udp/udp_proxy/v3/udp_proxy.proto

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ option (udpa.annotations.file_status).package_version_status = ACTIVE;
2020
// [#extension: envoy.filters.udp_listener.udp_proxy]
2121

2222
// Configuration for the UDP proxy filter.
23-
// [#next-free-field: 7]
23+
// [#next-free-field: 8]
2424
message UdpProxyConfig {
2525
option (udpa.annotations.versioning).previous_message_type =
2626
"envoy.config.filter.udp.udp_proxy.v2alpha.UdpProxyConfig";
@@ -82,4 +82,9 @@ message UdpProxyConfig {
8282
// :ref:`prefer_gro <envoy_v3_api_field_config.core.v3.UdpSocketConfig.prefer_gro>` is true for upstream
8383
// sockets as the assumption is datagrams will be received from a single source.
8484
config.core.v3.UdpSocketConfig upstream_socket_config = 6;
85+
86+
// Perform per packet load balancing (upstream host selection) on each received data chunk.
87+
// The default if not specified is false, that means each data chunk is forwarded
88+
// to upstream host selected on first chunk receival for that "session" (identified by source IP/port and local IP/port).
89+
bool use_per_packet_load_balancing = 7;
8590
}

docs/root/configuration/listeners/udp_filters/udp_proxy.rst

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -20,17 +20,26 @@ Each session is index by the 4-tuple consisting of source IP/port and local IP/p
2020
datagram is received on. Sessions last until the :ref:`idle timeout
2121
<envoy_v3_api_field_extensions.filters.udp.udp_proxy.v3.UdpProxyConfig.idle_timeout>` is reached.
2222

23+
Above *session stickness* could be disabled by setting :ref:`use_per_packet_load_balancing
24+
<envoy_v3_api_field_extensions.filters.udp.udp_proxy.v3.UdpProxyConfig.use_per_packet_load_balancing>` to true.
25+
In that case, *per packet load balancing* is enabled. It means that upstream host is selected on every single data chunk
26+
received by udp proxy using currently used load balancing policy.
27+
2328
The UDP proxy listener filter also can operate as a *transparent* proxy if the
2429
:ref:`use_original_src_ip <envoy_v3_api_msg_extensions.filters.udp.udp_proxy.v3.UdpProxyConfig>`
25-
field is set. But please keep in mind that it does not forward the port to upstreams. It forwards only the IP address to upstreams.
30+
field is set to true. But please keep in mind that it does not forward the port to upstreams. It forwards only the IP address to upstreams.
2631

2732
Load balancing and unhealthy host handling
2833
------------------------------------------
2934

3035
Envoy will fully utilize the configured load balancer for the configured upstream cluster when
31-
load balancing UDP datagrams. When a new session is created, Envoy will associate the session
36+
load balancing UDP datagrams. By default, when a new session is created, Envoy will associate the session
3237
with an upstream host selected using the configured load balancer. All future datagrams that
33-
belong to the session will be routed to the same upstream host.
38+
belong to the session will be routed to the same upstream host. However, if :ref:`use_per_packet_load_balancing
39+
<envoy_v3_api_field_extensions.filters.udp.udp_proxy.v3.UdpProxyConfig.use_per_packet_load_balancing>`
40+
field is set to true, Envoy selects another upstream host on next datagram using the configured load balancer
41+
and creates a new session if such does not exist. So in case of several upstream hosts available for the load balancer
42+
each data chunk is forwarded to a different host.
3443

3544
When an upstream host becomes unhealthy (due to :ref:`active health checking
3645
<arch_overview_health_checking>`), Envoy will attempt to create a new session to a healthy host

docs/root/version_history/current.rst

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -84,6 +84,7 @@ New Features
8484
* tls_inspector filter: added :ref:`enable_ja3_fingerprinting <envoy_v3_api_field_extensions.filters.listener.tls_inspector.v3.TlsInspector.enable_ja3_fingerprinting>` to create JA3 fingerprint hash from Client Hello message.
8585
* transport_socket: added :ref:`envoy.transport_sockets.tcp_stats <envoy_v3_api_msg_extensions.transport_sockets.tcp_stats.v3.Config>` which generates additional statistics gathered from the OS TCP stack.
8686
* udp: add support for multiple listener filters.
87+
* udp_proxy: added :ref:`use_per_packet_load_balancing <envoy_v3_api_field_extensions.filters.udp.udp_proxy.v3.UdpProxyConfig.use_per_packet_load_balancing>` option to enable per packet load balancing (selection of upstream host on each data chunk).
8788
* upstream: added the ability to :ref:`configure max connection duration <envoy_v3_api_field_config.core.v3.HttpProtocolOptions.max_connection_duration>` for upstream clusters.
8889
* vcl_socket_interface: added VCL socket interface extension for fd.io VPP integration to :ref:`contrib images <install_contrib>`. This can be enabled via :ref:`VCL <envoy_v3_api_msg_extensions.vcl.v3alpha.VclSocketInterface>` configuration.
8990
* xds: re-introduced unified delta and sotw xDS multiplexers that share most of the implementation. Added a new runtime config ``envoy.reloadable_features.unified_mux`` (disabled by default) that when enabled, switches xDS to use unified multiplexers.

source/extensions/filters/udp/udp_proxy/udp_proxy_filter.cc

Lines changed: 109 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -27,8 +27,13 @@ void UdpProxyFilter::onClusterAddOrUpdate(Upstream::ThreadLocalCluster& cluster)
2727
}
2828

2929
ENVOY_LOG(debug, "udp proxy: attaching to cluster {}", cluster.info()->name());
30-
ASSERT(cluster_info_ == absl::nullopt || &cluster_info_.value().cluster_ != &cluster);
31-
cluster_info_.emplace(*this, cluster);
30+
ASSERT(cluster_info_ == absl::nullopt || &cluster_info_.value()->cluster_ != &cluster);
31+
32+
if (config_->usingPerPacketLoadBalancing()) {
33+
cluster_info_.emplace(std::make_unique<PerPacketLoadBalancingClusterInfo>(*this, cluster));
34+
} else {
35+
cluster_info_.emplace(std::make_unique<StickySessionClusterInfo>(*this, cluster));
36+
}
3237
}
3338

3439
void UdpProxyFilter::onClusterRemoval(const std::string& cluster) {
@@ -46,7 +51,7 @@ Network::FilterStatus UdpProxyFilter::onData(Network::UdpRecvData& data) {
4651
return Network::FilterStatus::StopIteration;
4752
}
4853

49-
return cluster_info_.value().onData(data);
54+
return cluster_info_.value()->onData(data);
5055
}
5156

5257
Network::FilterStatus UdpProxyFilter::onReceiveError(Api::IoError::IoErrorCode) {
@@ -56,9 +61,10 @@ Network::FilterStatus UdpProxyFilter::onReceiveError(Api::IoError::IoErrorCode)
5661
}
5762

5863
UdpProxyFilter::ClusterInfo::ClusterInfo(UdpProxyFilter& filter,
59-
Upstream::ThreadLocalCluster& cluster)
64+
Upstream::ThreadLocalCluster& cluster,
65+
SessionStorageType&& sessions)
6066
: filter_(filter), cluster_(cluster),
61-
cluster_stats_(generateStats(cluster.info()->statsScope())),
67+
cluster_stats_(generateStats(cluster.info()->statsScope())), sessions_(std::move(sessions)),
6268
member_update_cb_handle_(cluster.prioritySet().addMemberUpdateCb(
6369
[this](const Upstream::HostVector&, const Upstream::HostVector& hosts_removed) {
6470
for (const auto& host : hosts_removed) {
@@ -85,36 +91,84 @@ UdpProxyFilter::ClusterInfo::~ClusterInfo() {
8591
ASSERT(host_to_sessions_.empty());
8692
}
8793

88-
Network::FilterStatus UdpProxyFilter::ClusterInfo::onData(Network::UdpRecvData& data) {
94+
void UdpProxyFilter::ClusterInfo::removeSession(const ActiveSession* session) {
95+
// First remove from the host to sessions map.
96+
ASSERT(host_to_sessions_[&session->host()].count(session) == 1);
97+
auto host_sessions_it = host_to_sessions_.find(&session->host());
98+
host_sessions_it->second.erase(session);
99+
if (host_sessions_it->second.empty()) {
100+
host_to_sessions_.erase(host_sessions_it);
101+
}
102+
103+
// Now remove it from the primary map.
104+
ASSERT(sessions_.count(session) == 1);
105+
sessions_.erase(session);
106+
}
107+
108+
UdpProxyFilter::ActiveSession*
109+
UdpProxyFilter::ClusterInfo::createSession(Network::UdpRecvData::LocalPeerAddresses&& addresses,
110+
const Upstream::HostConstSharedPtr& optional_host) {
111+
if (!cluster_.info()
112+
->resourceManager(Upstream::ResourcePriority::Default)
113+
.connections()
114+
.canCreate()) {
115+
ENVOY_LOG(debug, "cannot create new connection.");
116+
cluster_.info()->stats().upstream_cx_overflow_.inc();
117+
return nullptr;
118+
}
119+
120+
if (optional_host) {
121+
return createSessionWithHost(std::move(addresses), optional_host);
122+
}
123+
124+
auto host = chooseHost(addresses.peer_);
125+
if (host == nullptr) {
126+
ENVOY_LOG(debug, "cannot find any valid host.");
127+
cluster_.info()->stats().upstream_cx_none_healthy_.inc();
128+
return nullptr;
129+
}
130+
return createSessionWithHost(std::move(addresses), host);
131+
}
132+
133+
UdpProxyFilter::ActiveSession* UdpProxyFilter::ClusterInfo::createSessionWithHost(
134+
Network::UdpRecvData::LocalPeerAddresses&& addresses,
135+
const Upstream::HostConstSharedPtr& host) {
136+
ASSERT(host);
137+
auto new_session = std::make_unique<ActiveSession>(*this, std::move(addresses), host);
138+
auto new_session_ptr = new_session.get();
139+
sessions_.emplace(std::move(new_session));
140+
host_to_sessions_[host.get()].emplace(new_session_ptr);
141+
return new_session_ptr;
142+
}
143+
144+
Upstream::HostConstSharedPtr UdpProxyFilter::ClusterInfo::chooseHost(
145+
const Network::Address::InstanceConstSharedPtr& peer_address) const {
146+
UdpLoadBalancerContext context(filter_.config_->hashPolicy(), peer_address);
147+
Upstream::HostConstSharedPtr host = cluster_.loadBalancer().chooseHost(&context);
148+
return host;
149+
}
150+
151+
UdpProxyFilter::StickySessionClusterInfo::StickySessionClusterInfo(
152+
UdpProxyFilter& filter, Upstream::ThreadLocalCluster& cluster)
153+
: ClusterInfo(filter, cluster,
154+
SessionStorageType(1, HeterogeneousActiveSessionHash(false),
155+
HeterogeneousActiveSessionEqual(false))) {}
156+
157+
Network::FilterStatus UdpProxyFilter::StickySessionClusterInfo::onData(Network::UdpRecvData& data) {
89158
const auto active_session_it = sessions_.find(data.addresses_);
90159
ActiveSession* active_session;
91160
if (active_session_it == sessions_.end()) {
92-
if (!cluster_.info()
93-
->resourceManager(Upstream::ResourcePriority::Default)
94-
.connections()
95-
.canCreate()) {
96-
cluster_.info()->stats().upstream_cx_overflow_.inc();
161+
active_session = createSession(std::move(data.addresses_));
162+
if (active_session == nullptr) {
97163
return Network::FilterStatus::StopIteration;
98164
}
99-
100-
UdpLoadBalancerContext context(filter_.config_->hashPolicy(), data.addresses_.peer_);
101-
Upstream::HostConstSharedPtr host = cluster_.loadBalancer().chooseHost(&context);
102-
if (host == nullptr) {
103-
ENVOY_LOG(debug, "cannot find any valid host. failed to create a session.");
104-
cluster_.info()->stats().upstream_cx_none_healthy_.inc();
105-
return Network::FilterStatus::StopIteration;
106-
}
107-
108-
active_session = createSession(std::move(data.addresses_), host);
109165
} else {
110166
active_session = active_session_it->get();
111167
if (active_session->host().health() == Upstream::Host::Health::Unhealthy) {
112168
// If a host becomes unhealthy, we optimally would like to replace it with a new session
113169
// to a healthy host. We may eventually want to make this behavior configurable, but for now
114170
// this will be the universal behavior.
115-
116-
UdpLoadBalancerContext context(filter_.config_->hashPolicy(), data.addresses_.peer_);
117-
Upstream::HostConstSharedPtr host = cluster_.loadBalancer().chooseHost(&context);
171+
auto host = chooseHost(data.addresses_.peer_);
118172
if (host != nullptr && host->health() != Upstream::Host::Health::Unhealthy &&
119173
host.get() != &active_session->host()) {
120174
ENVOY_LOG(debug, "upstream session unhealthy, recreating the session");
@@ -132,28 +186,40 @@ Network::FilterStatus UdpProxyFilter::ClusterInfo::onData(Network::UdpRecvData&
132186
return Network::FilterStatus::StopIteration;
133187
}
134188

135-
UdpProxyFilter::ActiveSession*
136-
UdpProxyFilter::ClusterInfo::createSession(Network::UdpRecvData::LocalPeerAddresses&& addresses,
137-
const Upstream::HostConstSharedPtr& host) {
138-
auto new_session = std::make_unique<ActiveSession>(*this, std::move(addresses), host);
139-
auto new_session_ptr = new_session.get();
140-
sessions_.emplace(std::move(new_session));
141-
host_to_sessions_[host.get()].emplace(new_session_ptr);
142-
return new_session_ptr;
143-
}
189+
UdpProxyFilter::PerPacketLoadBalancingClusterInfo::PerPacketLoadBalancingClusterInfo(
190+
UdpProxyFilter& filter, Upstream::ThreadLocalCluster& cluster)
191+
: ClusterInfo(filter, cluster,
192+
SessionStorageType(1, HeterogeneousActiveSessionHash(true),
193+
HeterogeneousActiveSessionEqual(true))) {}
194+
195+
Network::FilterStatus
196+
UdpProxyFilter::PerPacketLoadBalancingClusterInfo::onData(Network::UdpRecvData& data) {
197+
auto host = chooseHost(data.addresses_.peer_);
198+
if (host == nullptr) {
199+
ENVOY_LOG(debug, "cannot find any valid host.");
200+
cluster_.info()->stats().upstream_cx_none_healthy_.inc();
201+
return Network::FilterStatus::StopIteration;
202+
}
144203

145-
void UdpProxyFilter::ClusterInfo::removeSession(const ActiveSession* session) {
146-
// First remove from the host to sessions map.
147-
ASSERT(host_to_sessions_[&session->host()].count(session) == 1);
148-
auto host_sessions_it = host_to_sessions_.find(&session->host());
149-
host_sessions_it->second.erase(session);
150-
if (host_sessions_it->second.empty()) {
151-
host_to_sessions_.erase(host_sessions_it);
204+
ENVOY_LOG(debug, "selected {} host as upstream.", host->address()->asStringView());
205+
206+
LocalPeerHostAddresses key{data.addresses_, *host};
207+
const auto active_session_it = sessions_.find(key);
208+
ActiveSession* active_session;
209+
if (active_session_it == sessions_.end()) {
210+
active_session = createSession(std::move(data.addresses_), host);
211+
if (active_session == nullptr) {
212+
return Network::FilterStatus::StopIteration;
213+
}
214+
} else {
215+
active_session = active_session_it->get();
216+
ENVOY_LOG(trace, "found already existing session on host {}.",
217+
active_session->host().address()->asStringView());
152218
}
153219

154-
// Now remove it from the primary map.
155-
ASSERT(sessions_.count(session) == 1);
156-
sessions_.erase(session);
220+
active_session->write(*data.buffer_);
221+
222+
return Network::FilterStatus::StopIteration;
157223
}
158224

159225
UdpProxyFilter::ActiveSession::ActiveSession(ClusterInfo& cluster,

0 commit comments

Comments
 (0)