2929import io .grpc .MethodDescriptor ;
3030import io .grpc .internal .SharedResourceHolder .Resource ;
3131import io .grpc .netty .NettyChannelBuilder ;
32- import io .netty .channel .EventLoopGroup ;
33- import io .netty .channel .nio .NioEventLoopGroup ;
34- import io .netty .channel .socket .nio .NioSocketChannel ;
35- import io .netty .util .concurrent .DefaultThreadFactory ;
3632import java .time .Duration ;
37- import java .util .Optional ;
3833import java .util .concurrent .ConcurrentMap ;
34+ import java .util .logging .Level ;
35+ import java .util .logging .Logger ;
3936import javax .annotation .concurrent .ThreadSafe ;
4037
4138/**
6158public final class S2AHandshakerServiceChannel {
6259 private static final ConcurrentMap <String , Resource <Channel >> SHARED_RESOURCE_CHANNELS =
6360 Maps .newConcurrentMap ();
64- private static final Duration DELEGATE_TERMINATION_TIMEOUT = Duration .ofSeconds (2 );
6561 private static final Duration CHANNEL_SHUTDOWN_TIMEOUT = Duration .ofSeconds (10 );
6662
6763 /**
@@ -74,8 +70,9 @@ public final class S2AHandshakerServiceChannel {
7470 * running at {@code s2aAddress}.
7571 */
7672 public static Resource <Channel > getChannelResource (
77- String s2aAddress , Optional < ChannelCredentials > s2aChannelCredentials ) {
73+ String s2aAddress , ChannelCredentials s2aChannelCredentials ) {
7874 checkNotNull (s2aAddress );
75+ checkNotNull (s2aChannelCredentials );
7976 return SHARED_RESOURCE_CHANNELS .computeIfAbsent (
8077 s2aAddress , channelResource -> new ChannelResource (s2aAddress , s2aChannelCredentials ));
8178 }
@@ -87,49 +84,31 @@ public static Resource<Channel> getChannelResource(
8784 */
8885 private static class ChannelResource implements Resource <Channel > {
8986 private final String targetAddress ;
90- private final Optional < ChannelCredentials > channelCredentials ;
87+ private final ChannelCredentials channelCredentials ;
9188
92- public ChannelResource (String targetAddress , Optional < ChannelCredentials > channelCredentials ) {
89+ public ChannelResource (String targetAddress , ChannelCredentials channelCredentials ) {
9390 this .targetAddress = targetAddress ;
9491 this .channelCredentials = channelCredentials ;
9592 }
9693
9794 /**
98- * Creates a {@code EventLoopHoldingChannel} instance to the service running at {@code
99- * targetAddress}. This channel uses a dedicated thread pool for its {@code EventLoopGroup}
100- * instance to avoid blocking.
95+ * Creates a {@code HandshakerServiceChannel} instance to the service running at {@code
96+ * targetAddress}.
10197 */
10298 @ Override
10399 public Channel create () {
104- EventLoopGroup eventLoopGroup =
105- new NioEventLoopGroup (1 , new DefaultThreadFactory ("S2A channel pool" , true ));
106- ManagedChannel channel = null ;
107- if (channelCredentials .isPresent ()) {
108- // Create a secure channel.
109- channel =
110- NettyChannelBuilder .forTarget (targetAddress , channelCredentials .get ())
111- .channelType (NioSocketChannel .class )
112- .directExecutor ()
113- .eventLoopGroup (eventLoopGroup )
114- .build ();
115- } else {
116- // Create a plaintext channel.
117- channel =
118- NettyChannelBuilder .forTarget (targetAddress )
119- .channelType (NioSocketChannel .class )
120- .directExecutor ()
121- .eventLoopGroup (eventLoopGroup )
122- .usePlaintext ()
123- .build ();
124- }
125- return EventLoopHoldingChannel .create (channel , eventLoopGroup );
100+ ManagedChannel channel =
101+ NettyChannelBuilder .forTarget (targetAddress , channelCredentials )
102+ .directExecutor ()
103+ .build ();
104+ return HandshakerServiceChannel .create (channel );
126105 }
127106
128- /** Destroys a {@code EventLoopHoldingChannel } instance. */
107+ /** Destroys a {@code HandshakerServiceChannel } instance. */
129108 @ Override
130109 public void close (Channel instanceChannel ) {
131110 checkNotNull (instanceChannel );
132- EventLoopHoldingChannel channel = (EventLoopHoldingChannel ) instanceChannel ;
111+ HandshakerServiceChannel channel = (HandshakerServiceChannel ) instanceChannel ;
133112 channel .close ();
134113 }
135114
@@ -140,23 +119,21 @@ public String toString() {
140119 }
141120
142121 /**
143- * Manages a channel using a {@link ManagedChannel} instance that belong to the {@code
144- * EventLoopGroup} thread pool.
122+ * Manages a channel using a {@link ManagedChannel} instance.
145123 */
146124 @ VisibleForTesting
147- static class EventLoopHoldingChannel extends Channel {
125+ static class HandshakerServiceChannel extends Channel {
126+ private static final Logger logger =
127+ Logger .getLogger (S2AHandshakerServiceChannel .class .getName ());
148128 private final ManagedChannel delegate ;
149- private final EventLoopGroup eventLoopGroup ;
150129
151- static EventLoopHoldingChannel create (ManagedChannel delegate , EventLoopGroup eventLoopGroup ) {
130+ static HandshakerServiceChannel create (ManagedChannel delegate ) {
152131 checkNotNull (delegate );
153- checkNotNull (eventLoopGroup );
154- return new EventLoopHoldingChannel (delegate , eventLoopGroup );
132+ return new HandshakerServiceChannel (delegate );
155133 }
156134
157- private EventLoopHoldingChannel (ManagedChannel delegate , EventLoopGroup eventLoopGroup ) {
135+ private HandshakerServiceChannel (ManagedChannel delegate ) {
158136 this .delegate = delegate ;
159- this .eventLoopGroup = eventLoopGroup ;
160137 }
161138
162139 /**
@@ -178,16 +155,12 @@ public <ReqT, RespT> ClientCall<ReqT, RespT> newCall(
178155 @ SuppressWarnings ("FutureReturnValueIgnored" )
179156 public void close () {
180157 delegate .shutdownNow ();
181- boolean isDelegateTerminated ;
182158 try {
183- isDelegateTerminated =
184- delegate .awaitTermination (DELEGATE_TERMINATION_TIMEOUT .getSeconds (), SECONDS );
159+ delegate .awaitTermination (CHANNEL_SHUTDOWN_TIMEOUT .getSeconds (), SECONDS );
185160 } catch (InterruptedException e ) {
186- isDelegateTerminated = false ;
161+ Thread .currentThread ().interrupt ();
162+ logger .log (Level .WARNING , "Channel to S2A was not shutdown." );
187163 }
188- long quietPeriodSeconds = isDelegateTerminated ? 0 : 1 ;
189- eventLoopGroup .shutdownGracefully (
190- quietPeriodSeconds , CHANNEL_SHUTDOWN_TIMEOUT .getSeconds (), SECONDS );
191164 }
192165 }
193166
0 commit comments