@@ -27,8 +27,13 @@ void UdpProxyFilter::onClusterAddOrUpdate(Upstream::ThreadLocalCluster& cluster)
2727 }
2828
2929 ENVOY_LOG (debug, " udp proxy: attaching to cluster {}" , cluster.info ()->name ());
30- ASSERT (cluster_info_ == absl::nullopt || &cluster_info_.value ().cluster_ != &cluster);
31- cluster_info_.emplace (*this , cluster);
30+ ASSERT (cluster_info_ == absl::nullopt || &cluster_info_.value ()->cluster_ != &cluster);
31+
32+ if (config_->usingPerPacketLoadBalancing ()) {
33+ cluster_info_.emplace (std::make_unique<PerPacketLoadBalancingClusterInfo>(*this , cluster));
34+ } else {
35+ cluster_info_.emplace (std::make_unique<StickySessionClusterInfo>(*this , cluster));
36+ }
3237}
3338
3439void UdpProxyFilter::onClusterRemoval (const std::string& cluster) {
@@ -46,7 +51,7 @@ Network::FilterStatus UdpProxyFilter::onData(Network::UdpRecvData& data) {
4651 return Network::FilterStatus::StopIteration;
4752 }
4853
49- return cluster_info_.value (). onData (data);
54+ return cluster_info_.value ()-> onData (data);
5055}
5156
5257Network::FilterStatus UdpProxyFilter::onReceiveError (Api::IoError::IoErrorCode) {
@@ -56,9 +61,10 @@ Network::FilterStatus UdpProxyFilter::onReceiveError(Api::IoError::IoErrorCode)
5661}
5762
5863UdpProxyFilter::ClusterInfo::ClusterInfo (UdpProxyFilter& filter,
59- Upstream::ThreadLocalCluster& cluster)
64+ Upstream::ThreadLocalCluster& cluster,
65+ SessionStorageType&& sessions)
6066 : filter_(filter), cluster_(cluster),
61- cluster_stats_(generateStats(cluster.info()->statsScope())),
67+ cluster_stats_(generateStats(cluster.info()->statsScope())), sessions_(std::move(sessions)),
6268 member_update_cb_handle_(cluster.prioritySet().addMemberUpdateCb(
6369 [this ](const Upstream::HostVector&, const Upstream::HostVector& hosts_removed) {
6470 for (const auto & host : hosts_removed) {
@@ -85,36 +91,84 @@ UdpProxyFilter::ClusterInfo::~ClusterInfo() {
8591 ASSERT (host_to_sessions_.empty ());
8692}
8793
88- Network::FilterStatus UdpProxyFilter::ClusterInfo::onData (Network::UdpRecvData& data) {
94+ void UdpProxyFilter::ClusterInfo::removeSession (const ActiveSession* session) {
95+ // First remove from the host to sessions map.
96+ ASSERT (host_to_sessions_[&session->host ()].count (session) == 1 );
97+ auto host_sessions_it = host_to_sessions_.find (&session->host ());
98+ host_sessions_it->second .erase (session);
99+ if (host_sessions_it->second .empty ()) {
100+ host_to_sessions_.erase (host_sessions_it);
101+ }
102+
103+ // Now remove it from the primary map.
104+ ASSERT (sessions_.count (session) == 1 );
105+ sessions_.erase (session);
106+ }
107+
108+ UdpProxyFilter::ActiveSession*
109+ UdpProxyFilter::ClusterInfo::createSession (Network::UdpRecvData::LocalPeerAddresses&& addresses,
110+ const Upstream::HostConstSharedPtr& optional_host) {
111+ if (!cluster_.info ()
112+ ->resourceManager (Upstream::ResourcePriority::Default)
113+ .connections ()
114+ .canCreate ()) {
115+ ENVOY_LOG (debug, " cannot create new connection." );
116+ cluster_.info ()->stats ().upstream_cx_overflow_ .inc ();
117+ return nullptr ;
118+ }
119+
120+ if (optional_host) {
121+ return createSessionWithHost (std::move (addresses), optional_host);
122+ }
123+
124+ auto host = chooseHost (addresses.peer_ );
125+ if (host == nullptr ) {
126+ ENVOY_LOG (debug, " cannot find any valid host." );
127+ cluster_.info ()->stats ().upstream_cx_none_healthy_ .inc ();
128+ return nullptr ;
129+ }
130+ return createSessionWithHost (std::move (addresses), host);
131+ }
132+
133+ UdpProxyFilter::ActiveSession* UdpProxyFilter::ClusterInfo::createSessionWithHost (
134+ Network::UdpRecvData::LocalPeerAddresses&& addresses,
135+ const Upstream::HostConstSharedPtr& host) {
136+ ASSERT (host);
137+ auto new_session = std::make_unique<ActiveSession>(*this , std::move (addresses), host);
138+ auto new_session_ptr = new_session.get ();
139+ sessions_.emplace (std::move (new_session));
140+ host_to_sessions_[host.get ()].emplace (new_session_ptr);
141+ return new_session_ptr;
142+ }
143+
144+ Upstream::HostConstSharedPtr UdpProxyFilter::ClusterInfo::chooseHost (
145+ const Network::Address::InstanceConstSharedPtr& peer_address) const {
146+ UdpLoadBalancerContext context (filter_.config_ ->hashPolicy (), peer_address);
147+ Upstream::HostConstSharedPtr host = cluster_.loadBalancer ().chooseHost (&context);
148+ return host;
149+ }
150+
151+ UdpProxyFilter::StickySessionClusterInfo::StickySessionClusterInfo (
152+ UdpProxyFilter& filter, Upstream::ThreadLocalCluster& cluster)
153+ : ClusterInfo(filter, cluster,
154+ SessionStorageType (1 , HeterogeneousActiveSessionHash(false ),
155+ HeterogeneousActiveSessionEqual(false ))) {}
156+
157+ Network::FilterStatus UdpProxyFilter::StickySessionClusterInfo::onData (Network::UdpRecvData& data) {
89158 const auto active_session_it = sessions_.find (data.addresses_ );
90159 ActiveSession* active_session;
91160 if (active_session_it == sessions_.end ()) {
92- if (!cluster_.info ()
93- ->resourceManager (Upstream::ResourcePriority::Default)
94- .connections ()
95- .canCreate ()) {
96- cluster_.info ()->stats ().upstream_cx_overflow_ .inc ();
161+ active_session = createSession (std::move (data.addresses_ ));
162+ if (active_session == nullptr ) {
97163 return Network::FilterStatus::StopIteration;
98164 }
99-
100- UdpLoadBalancerContext context (filter_.config_ ->hashPolicy (), data.addresses_ .peer_ );
101- Upstream::HostConstSharedPtr host = cluster_.loadBalancer ().chooseHost (&context);
102- if (host == nullptr ) {
103- ENVOY_LOG (debug, " cannot find any valid host. failed to create a session." );
104- cluster_.info ()->stats ().upstream_cx_none_healthy_ .inc ();
105- return Network::FilterStatus::StopIteration;
106- }
107-
108- active_session = createSession (std::move (data.addresses_ ), host);
109165 } else {
110166 active_session = active_session_it->get ();
111167 if (active_session->host ().health () == Upstream::Host::Health::Unhealthy) {
112168 // If a host becomes unhealthy, we optimally would like to replace it with a new session
113169 // to a healthy host. We may eventually want to make this behavior configurable, but for now
114170 // this will be the universal behavior.
115-
116- UdpLoadBalancerContext context (filter_.config_ ->hashPolicy (), data.addresses_ .peer_ );
117- Upstream::HostConstSharedPtr host = cluster_.loadBalancer ().chooseHost (&context);
171+ auto host = chooseHost (data.addresses_ .peer_ );
118172 if (host != nullptr && host->health () != Upstream::Host::Health::Unhealthy &&
119173 host.get () != &active_session->host ()) {
120174 ENVOY_LOG (debug, " upstream session unhealthy, recreating the session" );
@@ -132,28 +186,40 @@ Network::FilterStatus UdpProxyFilter::ClusterInfo::onData(Network::UdpRecvData&
132186 return Network::FilterStatus::StopIteration;
133187}
134188
135- UdpProxyFilter::ActiveSession*
136- UdpProxyFilter::ClusterInfo::createSession (Network::UdpRecvData::LocalPeerAddresses&& addresses,
137- const Upstream::HostConstSharedPtr& host) {
138- auto new_session = std::make_unique<ActiveSession>(*this , std::move (addresses), host);
139- auto new_session_ptr = new_session.get ();
140- sessions_.emplace (std::move (new_session));
141- host_to_sessions_[host.get ()].emplace (new_session_ptr);
142- return new_session_ptr;
143- }
189+ UdpProxyFilter::PerPacketLoadBalancingClusterInfo::PerPacketLoadBalancingClusterInfo (
190+ UdpProxyFilter& filter, Upstream::ThreadLocalCluster& cluster)
191+ : ClusterInfo(filter, cluster,
192+ SessionStorageType (1 , HeterogeneousActiveSessionHash(true ),
193+ HeterogeneousActiveSessionEqual(true ))) {}
194+
195+ Network::FilterStatus
196+ UdpProxyFilter::PerPacketLoadBalancingClusterInfo::onData (Network::UdpRecvData& data) {
197+ auto host = chooseHost (data.addresses_ .peer_ );
198+ if (host == nullptr ) {
199+ ENVOY_LOG (debug, " cannot find any valid host." );
200+ cluster_.info ()->stats ().upstream_cx_none_healthy_ .inc ();
201+ return Network::FilterStatus::StopIteration;
202+ }
144203
145- void UdpProxyFilter::ClusterInfo::removeSession (const ActiveSession* session) {
146- // First remove from the host to sessions map.
147- ASSERT (host_to_sessions_[&session->host ()].count (session) == 1 );
148- auto host_sessions_it = host_to_sessions_.find (&session->host ());
149- host_sessions_it->second .erase (session);
150- if (host_sessions_it->second .empty ()) {
151- host_to_sessions_.erase (host_sessions_it);
204+ ENVOY_LOG (debug, " selected {} host as upstream." , host->address ()->asStringView ());
205+
206+ LocalPeerHostAddresses key{data.addresses_ , *host};
207+ const auto active_session_it = sessions_.find (key);
208+ ActiveSession* active_session;
209+ if (active_session_it == sessions_.end ()) {
210+ active_session = createSession (std::move (data.addresses_ ), host);
211+ if (active_session == nullptr ) {
212+ return Network::FilterStatus::StopIteration;
213+ }
214+ } else {
215+ active_session = active_session_it->get ();
216+ ENVOY_LOG (trace, " found already existing session on host {}." ,
217+ active_session->host ().address ()->asStringView ());
152218 }
153219
154- // Now remove it from the primary map.
155- ASSERT (sessions_. count (session) == 1 );
156- sessions_. erase (session) ;
220+ active_session-> write (*data. buffer_ );
221+
222+ return Network::FilterStatus::StopIteration ;
157223}
158224
159225UdpProxyFilter::ActiveSession::ActiveSession (ClusterInfo& cluster,
0 commit comments