Skip to content

Commit 778cfb4

Browse files
Merge branch 'grpc:master' into master
2 parents fef4c92 + d8f73e0 commit 778cfb4

12 files changed

Lines changed: 129 additions & 152 deletions

netty/src/main/java/io/grpc/netty/InternalProtocolNegotiators.java

Lines changed: 16 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,12 +17,14 @@
1717
package io.grpc.netty;
1818

1919
import io.grpc.ChannelLogger;
20+
import io.grpc.internal.ObjectPool;
2021
import io.grpc.netty.ProtocolNegotiators.ClientTlsHandler;
2122
import io.grpc.netty.ProtocolNegotiators.GrpcNegotiationHandler;
2223
import io.grpc.netty.ProtocolNegotiators.WaitUntilActiveHandler;
2324
import io.netty.channel.ChannelHandler;
2425
import io.netty.handler.ssl.SslContext;
2526
import io.netty.util.AsciiString;
27+
import java.util.concurrent.Executor;
2628

2729
/**
2830
* Internal accessor for {@link ProtocolNegotiators}.
@@ -35,9 +37,12 @@ private InternalProtocolNegotiators() {}
3537
* Returns a {@link ProtocolNegotiator} that ensures the pipeline is set up so that TLS will
3638
* be negotiated, the {@code handler} is added and writes to the {@link io.netty.channel.Channel}
3739
* may happen immediately, even before the TLS Handshake is complete.
40+
* @param executorPool a dedicated {@link Executor} pool for time-consuming TLS tasks
3841
*/
39-
public static InternalProtocolNegotiator.ProtocolNegotiator tls(SslContext sslContext) {
40-
final io.grpc.netty.ProtocolNegotiator negotiator = ProtocolNegotiators.tls(sslContext);
42+
public static InternalProtocolNegotiator.ProtocolNegotiator tls(SslContext sslContext,
43+
ObjectPool<? extends Executor> executorPool) {
44+
final io.grpc.netty.ProtocolNegotiator negotiator = ProtocolNegotiators.tls(sslContext,
45+
executorPool);
4146
final class TlsNegotiator implements InternalProtocolNegotiator.ProtocolNegotiator {
4247

4348
@Override
@@ -58,6 +63,15 @@ public void close() {
5863

5964
return new TlsNegotiator();
6065
}
66+
67+
/**
68+
* Returns a {@link ProtocolNegotiator} that ensures the pipeline is set up so that TLS will
69+
* be negotiated, the {@code handler} is added and writes to the {@link io.netty.channel.Channel}
70+
* may happen immediately, even before the TLS Handshake is complete.
71+
*/
72+
public static InternalProtocolNegotiator.ProtocolNegotiator tls(SslContext sslContext) {
73+
return tls(sslContext, null);
74+
}
6175

6276
/**
6377
* Returns a {@link ProtocolNegotiator} that ensures the pipeline is set up so that TLS will be

s2a/src/main/java/io/grpc/s2a/MtlsToS2AChannelCredentials.java

Lines changed: 7 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -21,17 +21,16 @@
2121
import static com.google.common.base.Strings.isNullOrEmpty;
2222

2323
import io.grpc.ChannelCredentials;
24+
import io.grpc.ExperimentalApi;
2425
import io.grpc.TlsChannelCredentials;
25-
import io.grpc.util.AdvancedTlsX509KeyManager;
26-
import io.grpc.util.AdvancedTlsX509TrustManager;
2726
import java.io.File;
2827
import java.io.IOException;
29-
import java.security.GeneralSecurityException;
3028

3129
/**
3230
* Configures an {@code S2AChannelCredentials.Builder} instance with credentials used to establish a
3331
* connection with the S2A to support talking to the S2A over mTLS.
3432
*/
33+
@ExperimentalApi("https://github.com/grpc/grpc-java/issues/11533")
3534
public final class MtlsToS2AChannelCredentials {
3635
/**
3736
* Creates a {@code S2AChannelCredentials.Builder} builder, that talks to the S2A over mTLS.
@@ -42,7 +41,7 @@ public final class MtlsToS2AChannelCredentials {
4241
* @param trustBundlePath the path to the trust bundle PEM.
4342
* @return a {@code MtlsToS2AChannelCredentials.Builder} instance.
4443
*/
45-
public static Builder createBuilder(
44+
public static Builder newBuilder(
4645
String s2aAddress, String privateKeyPath, String certChainPath, String trustBundlePath) {
4746
checkArgument(!isNullOrEmpty(s2aAddress), "S2A address must not be null or empty.");
4847
checkArgument(!isNullOrEmpty(privateKeyPath), "privateKeyPath must not be null or empty.");
@@ -66,7 +65,7 @@ public static final class Builder {
6665
this.trustBundlePath = trustBundlePath;
6766
}
6867

69-
public S2AChannelCredentials.Builder build() throws GeneralSecurityException, IOException {
68+
public S2AChannelCredentials.Builder build() throws IOException {
7069
checkState(!isNullOrEmpty(s2aAddress), "S2A address must not be null or empty.");
7170
checkState(!isNullOrEmpty(privateKeyPath), "privateKeyPath must not be null or empty.");
7271
checkState(!isNullOrEmpty(certChainPath), "certChainPath must not be null or empty.");
@@ -75,19 +74,13 @@ public S2AChannelCredentials.Builder build() throws GeneralSecurityException, IO
7574
File certChainFile = new File(certChainPath);
7675
File trustBundleFile = new File(trustBundlePath);
7776

78-
AdvancedTlsX509KeyManager keyManager = new AdvancedTlsX509KeyManager();
79-
keyManager.updateIdentityCredentials(certChainFile, privateKeyFile);
80-
81-
AdvancedTlsX509TrustManager trustManager = AdvancedTlsX509TrustManager.newBuilder().build();
82-
trustManager.updateTrustCredentials(trustBundleFile);
83-
8477
ChannelCredentials channelToS2ACredentials =
8578
TlsChannelCredentials.newBuilder()
86-
.keyManager(keyManager)
87-
.trustManager(trustManager)
79+
.keyManager(certChainFile, privateKeyFile)
80+
.trustManager(trustBundleFile)
8881
.build();
8982

90-
return S2AChannelCredentials.createBuilder(s2aAddress)
83+
return S2AChannelCredentials.newBuilder(s2aAddress)
9184
.setS2AChannelCredentials(channelToS2ACredentials);
9285
}
9386
}

s2a/src/main/java/io/grpc/s2a/S2AChannelCredentials.java

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -24,29 +24,31 @@
2424
import com.google.errorprone.annotations.CanIgnoreReturnValue;
2525
import io.grpc.Channel;
2626
import io.grpc.ChannelCredentials;
27+
import io.grpc.ExperimentalApi;
28+
import io.grpc.InsecureChannelCredentials;
2729
import io.grpc.internal.ObjectPool;
2830
import io.grpc.internal.SharedResourcePool;
2931
import io.grpc.netty.InternalNettyChannelCredentials;
3032
import io.grpc.netty.InternalProtocolNegotiator;
3133
import io.grpc.s2a.channel.S2AHandshakerServiceChannel;
3234
import io.grpc.s2a.handshaker.S2AIdentity;
3335
import io.grpc.s2a.handshaker.S2AProtocolNegotiatorFactory;
34-
import java.util.Optional;
3536
import javax.annotation.concurrent.NotThreadSafe;
3637
import org.checkerframework.checker.nullness.qual.Nullable;
3738

3839
/**
3940
* Configures gRPC to use S2A for transport security when establishing a secure channel. Only for
4041
* use on the client side of a gRPC connection.
4142
*/
43+
@ExperimentalApi("https://github.com/grpc/grpc-java/issues/11533")
4244
public final class S2AChannelCredentials {
4345
/**
4446
* Creates a channel credentials builder for establishing an S2A-secured connection.
4547
*
4648
* @param s2aAddress the address of the S2A server used to secure the connection.
4749
* @return a {@code S2AChannelCredentials.Builder} instance.
4850
*/
49-
public static Builder createBuilder(String s2aAddress) {
51+
public static Builder newBuilder(String s2aAddress) {
5052
checkArgument(!isNullOrEmpty(s2aAddress), "S2A address must not be null or empty.");
5153
return new Builder(s2aAddress);
5254
}
@@ -56,13 +58,13 @@ public static Builder createBuilder(String s2aAddress) {
5658
public static final class Builder {
5759
private final String s2aAddress;
5860
private ObjectPool<Channel> s2aChannelPool;
59-
private Optional<ChannelCredentials> s2aChannelCredentials;
61+
private ChannelCredentials s2aChannelCredentials;
6062
private @Nullable S2AIdentity localIdentity = null;
6163

6264
Builder(String s2aAddress) {
6365
this.s2aAddress = s2aAddress;
6466
this.s2aChannelPool = null;
65-
this.s2aChannelCredentials = Optional.empty();
67+
this.s2aChannelCredentials = InsecureChannelCredentials.create();
6668
}
6769

6870
/**
@@ -107,7 +109,7 @@ public Builder setLocalUid(String localUid) {
107109
/** Sets the credentials to be used when connecting to the S2A. */
108110
@CanIgnoreReturnValue
109111
public Builder setS2AChannelCredentials(ChannelCredentials s2aChannelCredentials) {
110-
this.s2aChannelCredentials = Optional.of(s2aChannelCredentials);
112+
this.s2aChannelCredentials = s2aChannelCredentials;
111113
return this;
112114
}
113115

s2a/src/main/java/io/grpc/s2a/channel/S2AHandshakerServiceChannel.java

Lines changed: 25 additions & 52 deletions
Original file line numberDiff line numberDiff line change
@@ -29,13 +29,10 @@
2929
import io.grpc.MethodDescriptor;
3030
import io.grpc.internal.SharedResourceHolder.Resource;
3131
import io.grpc.netty.NettyChannelBuilder;
32-
import io.netty.channel.EventLoopGroup;
33-
import io.netty.channel.nio.NioEventLoopGroup;
34-
import io.netty.channel.socket.nio.NioSocketChannel;
35-
import io.netty.util.concurrent.DefaultThreadFactory;
3632
import java.time.Duration;
37-
import java.util.Optional;
3833
import java.util.concurrent.ConcurrentMap;
34+
import java.util.logging.Level;
35+
import java.util.logging.Logger;
3936
import javax.annotation.concurrent.ThreadSafe;
4037

4138
/**
@@ -61,7 +58,6 @@
6158
public final class S2AHandshakerServiceChannel {
6259
private static final ConcurrentMap<String, Resource<Channel>> SHARED_RESOURCE_CHANNELS =
6360
Maps.newConcurrentMap();
64-
private static final Duration DELEGATE_TERMINATION_TIMEOUT = Duration.ofSeconds(2);
6561
private static final Duration CHANNEL_SHUTDOWN_TIMEOUT = Duration.ofSeconds(10);
6662

6763
/**
@@ -74,8 +70,9 @@ public final class S2AHandshakerServiceChannel {
7470
* running at {@code s2aAddress}.
7571
*/
7672
public static Resource<Channel> getChannelResource(
77-
String s2aAddress, Optional<ChannelCredentials> s2aChannelCredentials) {
73+
String s2aAddress, ChannelCredentials s2aChannelCredentials) {
7874
checkNotNull(s2aAddress);
75+
checkNotNull(s2aChannelCredentials);
7976
return SHARED_RESOURCE_CHANNELS.computeIfAbsent(
8077
s2aAddress, channelResource -> new ChannelResource(s2aAddress, s2aChannelCredentials));
8178
}
@@ -87,49 +84,31 @@ public static Resource<Channel> getChannelResource(
8784
*/
8885
private static class ChannelResource implements Resource<Channel> {
8986
private final String targetAddress;
90-
private final Optional<ChannelCredentials> channelCredentials;
87+
private final ChannelCredentials channelCredentials;
9188

92-
public ChannelResource(String targetAddress, Optional<ChannelCredentials> channelCredentials) {
89+
public ChannelResource(String targetAddress, ChannelCredentials channelCredentials) {
9390
this.targetAddress = targetAddress;
9491
this.channelCredentials = channelCredentials;
9592
}
9693

9794
/**
98-
* Creates a {@code EventLoopHoldingChannel} instance to the service running at {@code
99-
* targetAddress}. This channel uses a dedicated thread pool for its {@code EventLoopGroup}
100-
* instance to avoid blocking.
95+
* Creates a {@code HandshakerServiceChannel} instance to the service running at {@code
96+
* targetAddress}.
10197
*/
10298
@Override
10399
public Channel create() {
104-
EventLoopGroup eventLoopGroup =
105-
new NioEventLoopGroup(1, new DefaultThreadFactory("S2A channel pool", true));
106-
ManagedChannel channel = null;
107-
if (channelCredentials.isPresent()) {
108-
// Create a secure channel.
109-
channel =
110-
NettyChannelBuilder.forTarget(targetAddress, channelCredentials.get())
111-
.channelType(NioSocketChannel.class)
112-
.directExecutor()
113-
.eventLoopGroup(eventLoopGroup)
114-
.build();
115-
} else {
116-
// Create a plaintext channel.
117-
channel =
118-
NettyChannelBuilder.forTarget(targetAddress)
119-
.channelType(NioSocketChannel.class)
120-
.directExecutor()
121-
.eventLoopGroup(eventLoopGroup)
122-
.usePlaintext()
123-
.build();
124-
}
125-
return EventLoopHoldingChannel.create(channel, eventLoopGroup);
100+
ManagedChannel channel =
101+
NettyChannelBuilder.forTarget(targetAddress, channelCredentials)
102+
.directExecutor()
103+
.build();
104+
return HandshakerServiceChannel.create(channel);
126105
}
127106

128-
/** Destroys a {@code EventLoopHoldingChannel} instance. */
107+
/** Destroys a {@code HandshakerServiceChannel} instance. */
129108
@Override
130109
public void close(Channel instanceChannel) {
131110
checkNotNull(instanceChannel);
132-
EventLoopHoldingChannel channel = (EventLoopHoldingChannel) instanceChannel;
111+
HandshakerServiceChannel channel = (HandshakerServiceChannel) instanceChannel;
133112
channel.close();
134113
}
135114

@@ -140,23 +119,21 @@ public String toString() {
140119
}
141120

142121
/**
143-
* Manages a channel using a {@link ManagedChannel} instance that belong to the {@code
144-
* EventLoopGroup} thread pool.
122+
* Manages a channel using a {@link ManagedChannel} instance.
145123
*/
146124
@VisibleForTesting
147-
static class EventLoopHoldingChannel extends Channel {
125+
static class HandshakerServiceChannel extends Channel {
126+
private static final Logger logger =
127+
Logger.getLogger(S2AHandshakerServiceChannel.class.getName());
148128
private final ManagedChannel delegate;
149-
private final EventLoopGroup eventLoopGroup;
150129

151-
static EventLoopHoldingChannel create(ManagedChannel delegate, EventLoopGroup eventLoopGroup) {
130+
static HandshakerServiceChannel create(ManagedChannel delegate) {
152131
checkNotNull(delegate);
153-
checkNotNull(eventLoopGroup);
154-
return new EventLoopHoldingChannel(delegate, eventLoopGroup);
132+
return new HandshakerServiceChannel(delegate);
155133
}
156134

157-
private EventLoopHoldingChannel(ManagedChannel delegate, EventLoopGroup eventLoopGroup) {
135+
private HandshakerServiceChannel(ManagedChannel delegate) {
158136
this.delegate = delegate;
159-
this.eventLoopGroup = eventLoopGroup;
160137
}
161138

162139
/**
@@ -178,16 +155,12 @@ public <ReqT, RespT> ClientCall<ReqT, RespT> newCall(
178155
@SuppressWarnings("FutureReturnValueIgnored")
179156
public void close() {
180157
delegate.shutdownNow();
181-
boolean isDelegateTerminated;
182158
try {
183-
isDelegateTerminated =
184-
delegate.awaitTermination(DELEGATE_TERMINATION_TIMEOUT.getSeconds(), SECONDS);
159+
delegate.awaitTermination(CHANNEL_SHUTDOWN_TIMEOUT.getSeconds(), SECONDS);
185160
} catch (InterruptedException e) {
186-
isDelegateTerminated = false;
161+
Thread.currentThread().interrupt();
162+
logger.log(Level.WARNING, "Channel to S2A was not shutdown.");
187163
}
188-
long quietPeriodSeconds = isDelegateTerminated ? 0 : 1;
189-
eventLoopGroup.shutdownGracefully(
190-
quietPeriodSeconds, CHANNEL_SHUTDOWN_TIMEOUT.getSeconds(), SECONDS);
191164
}
192165
}
193166

s2a/src/main/java/io/grpc/s2a/handshaker/S2AProtocolNegotiatorFactory.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,9 @@
2929
import com.google.common.util.concurrent.MoreExecutors;
3030
import com.google.errorprone.annotations.ThreadSafe;
3131
import io.grpc.Channel;
32+
import io.grpc.internal.GrpcUtil;
3233
import io.grpc.internal.ObjectPool;
34+
import io.grpc.internal.SharedResourcePool;
3335
import io.grpc.netty.GrpcHttp2ConnectionHandler;
3436
import io.grpc.netty.InternalProtocolNegotiator;
3537
import io.grpc.netty.InternalProtocolNegotiator.ProtocolNegotiator;
@@ -227,7 +229,10 @@ protected void handlerAdded0(ChannelHandlerContext ctx) {
227229
@Override
228230
public void onSuccess(SslContext sslContext) {
229231
ChannelHandler handler =
230-
InternalProtocolNegotiators.tls(sslContext).newHandler(grpcHandler);
232+
InternalProtocolNegotiators.tls(
233+
sslContext,
234+
SharedResourcePool.forResource(GrpcUtil.SHARED_CHANNEL_EXECUTOR))
235+
.newHandler(grpcHandler);
231236

232237
// Remove the bufferReads handler and delegate the rest of the handshake to the TLS
233238
// handler.

s2a/src/main/java/io/grpc/s2a/handshaker/S2ATrustManager.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -121,6 +121,9 @@ private void checkPeerTrusted(X509Certificate[] chain, boolean isCheckingClientC
121121
try {
122122
resp = stub.send(reqBuilder.build());
123123
} catch (IOException | InterruptedException e) {
124+
if (e instanceof InterruptedException) {
125+
Thread.currentThread().interrupt();
126+
}
124127
throw new CertificateException("Failed to send request to S2A.", e);
125128
}
126129
if (resp.hasStatus() && resp.getStatus().getCode() != 0) {

0 commit comments

Comments
 (0)