Open node connections asynchronously#35144
Conversation
|
Pinging @elastic/es-distributed |
|
This was motivated by my work on #35095. In that issue we will need to introduce retries on connections. So I went ahead and started the work on async connections as part of that. |
s1monw
left a comment
There was a problem hiding this comment.
this looks awesome, thanks for doing this. I left some comments
| private final CompletableContext<Void> closeContext = new CompletableContext<>(); | ||
|
|
||
| Netty4TcpChannel(Channel channel, String profile) { | ||
| this(channel, profile, completedConnectContext()); |
There was a problem hiding this comment.
why do we have the extra ctor here? is it for testing? i would love to remove this one it might be worth going the extra mile and do this in every test explicitly.
| import java.util.concurrent.ConcurrentMap; | ||
| import java.util.concurrent.atomic.AtomicBoolean; | ||
|
|
||
| class TcpTransportHandshaker { |
There was a problem hiding this comment.
can be final and can have javadocs?!
| private final ConnectionProfile connectionProfile; | ||
| private final List<TcpChannel> channels; | ||
| private final ActionListener<NodeChannels> listener; | ||
| private final AtomicInteger pendingConnections; |
There was a problem hiding this comment.
I'd use CountDown.java here instead for the pendingConnections and an extra boolean if we failed or not. That is more straight forward and the impls will be simpler.
|
|
||
| @Override | ||
| public void onFailure(Exception e) { | ||
| CloseableChannel.closeChannels(channels, false); |
There was a problem hiding this comment.
CloseableChannel.closeChannel can throw so should we call the listener first or use try/finally?
There was a problem hiding this comment.
CloseableChannel.closeChannel actually should not throw. CloseableChannel close signature does not include an exception.
static <C extends CloseableChannel> void closeChannels(List<C> channels, boolean blocking) {
try {
IOUtils.close(channels);
} catch (IOException e) {
// The CloseableChannel#close method does not throw IOException, so this should not occur.
throw new UncheckedIOException(e);
}
if (blocking) {
ArrayList<ActionFuture<Void>> futures = new ArrayList<>(channels.size());
for (final C channel : channels) {
PlainActionFuture<Void> closeFuture = PlainActionFuture.newFuture();
channel.addCloseListener(closeFuture);
futures.add(closeFuture);
}
blockOnFutures(futures);
}
}
Should I changed that to UncheckedIOException to AssertionError? I can still do your suggested try/catch if you would like for safety.
| } | ||
| }); | ||
| } catch (Exception ex) { | ||
| CloseableChannel.closeChannels(channels, false); |
There was a problem hiding this comment.
CloseableChannel.closeChannel can throw so should we call the listener first or use try/finally?
| public void onFailure(Exception ex) { | ||
| assert pendingConnections.get() != 0 : "Should not receive non-timeout connection exception if no connections pending."; | ||
| if (setFailed()) { | ||
| CloseableChannel.closeChannels(channels, false); |
There was a problem hiding this comment.
CloseableChannel.closeChannel can throw so should we call the listener first or use try/finally?
|
|
||
| public void onTimeout() { | ||
| if (setFailed()) { | ||
| CloseableChannel.closeChannels(channels, false); |
There was a problem hiding this comment.
CloseableChannel.closeChannel can throw so should we call the listener first or use try/finally?
| private final HandshakeRequestSender handshakeRequestSender; | ||
| private final HandshakeResponseSender handshakeResponseSender; | ||
|
|
||
| TcpTransportHandshaker(Version version, ThreadPool threadPool, HandshakeRequestSender handshakeRequestSender, |
There was a problem hiding this comment.
I would see the value of a functional interface here if we had tests for it but it seems we don't. Did I miss it?
|
@s1monw - I made changes based on your review. The primary thing that needs to be resolved is what you want done in regards to |
s1monw
left a comment
There was a problem hiding this comment.
left one comment LGTM otherwise
| } | ||
| }); | ||
|
|
||
| if (connectFuture != null) { |
There was a problem hiding this comment.
can we just not make it nullable? it would be nice to have it just be there all the time?
| } catch (IOException e) { | ||
| // The CloseableChannel#close method does not throw IOException, so this should not occur. | ||
| throw new UncheckedIOException(e); | ||
| throw new AssertionError(e); |
This is related to elastic#29023. Additionally at other points we have discussed a preference for removing the need to unnecessarily block threads for opening new node connections. This commit lays the groudwork for this by opening connections asynchronously at the transport level. We still block, however, this work will make it possible to eventually remove all blocking on new connections out of the TransportService and Transport.
This is related to elastic#29023. Additionally at other points we have discussed a preference for removing the need to unnecessarily block threads for opening new node connections. This commit lays the groudwork for this by opening connections asynchronously at the transport level. We still block, however, this work will make it possible to eventually remove all blocking on new connections out of the TransportService and Transport.
This is a follow-up to #35144. That commit made the underlying connection opening process in TcpTransport asynchronous. However the method still blocked on the process being complete before returning. This commit moves the blocking to the ConnectionManager level. This is another step towards the top-level TransportService api being async.
This is a follow-up to elastic#35144. That commit made the underlying connection opening process in TcpTransport asynchronous. However the method still blocked on the process being complete before returning. This commit moves the blocking to the ConnectionManager level. This is another step towards the top-level TransportService api being async.
This is a follow-up to #35144. That commit made the underlying connection opening process in TcpTransport asynchronous. However the method still blocked on the process being complete before returning. This commit moves the blocking to the ConnectionManager level. This is another step towards the top-level TransportService api being async.
This is related to #29023. Additionally at other points we have
discussed a preference for removing the need to unnecessarily block
threads for opening new node connections. This commit lays the groudwork
for this by opening connections asynchronously at the transport level.
We still block, however, this work will make it possible to eventually
remove all blocking on new connections out of the TransportService
and Transport.