Add TcpChannel to unify Transport implementations#27132
Add TcpChannel to unify Transport implementations#27132Tim-Brooks merged 31 commits intoelastic:masterfrom
Conversation
s1monw
left a comment
There was a problem hiding this comment.
I did an initial pass. I like it. left some comments
|
|
||
| import org.elasticsearch.action.support.PlainActionFuture; | ||
|
|
||
| public class PlainChannelFuture<Channel> extends PlainActionFuture<Channel> { |
There was a problem hiding this comment.
can we have javadocs for this? and can it be final
|
|
||
| ListenableActionFuture<C> closeAsync(); | ||
|
|
||
| ListenableActionFuture<C> getCloseFuture(); |
There was a problem hiding this comment.
At times we need to attach a close listener. And the future allows us to do that. But I presume that you want just a method to add close listeners instead.
|
|
||
| public interface TcpChannel<C extends TcpChannel<C>> { | ||
|
|
||
| ListenableActionFuture<C> closeAsync(); |
There was a problem hiding this comment.
can we turn this around and accept a listener isntead? I really don't like returning futures it's a design flaw IMO
There was a problem hiding this comment.
I'm not sure I understand here. We need something to block on at shutdown. Are you suggesting this?
PlainActionFuture<C> future = PlainActionFuture.newFuture();
channel.addCloseListener(future);
channel.closeAsync();
future.actionGet();
There was a problem hiding this comment.
Or some callback with a CountDownLatch?
CountDownLatch latch = new CountDownLatch(channels.size());
for (final C channel : channels) {
channel.addCloseListener(ActionListener.wrap(latch::countDown));
}
latch.await();
There was a problem hiding this comment.
I think the simplest options would be this:
void close(ActionListener<Void> onClosed);|
|
||
| import java.io.IOException; | ||
|
|
||
| public interface TcpChannel<C extends TcpChannel<C>> { |
There was a problem hiding this comment.
I assume that down the road we will get rid of the generics here and add all the necessary method to it that TcpTransport needs, right?
|
|
||
| public List<Channel> getChannels() { | ||
| return Arrays.asList(channels); | ||
| return new ArrayList<>(channels); |
There was a problem hiding this comment.
Can we wrap this in the ctor in a Collections#unmodifiableList and just return the instance instead of a mutable copy
There was a problem hiding this comment.
Yep good idea. I should use the unmodifiableList in a few other places (in NodeChannels) too.
| List<PlainChannelFuture<Channel>> pendingChannels = new ArrayList<>(numConnections); | ||
| for (int i = 0; i < numConnections; ++i) { | ||
| try { | ||
| PlainChannelFuture<Channel> pending = initiateChannel(node, connectionProfile.getConnectTimeout()); |
There was a problem hiding this comment.
I don't like that this returns a future. I am almost certain it's simpler if we go with a callback here. I havent' seen a useage of future where it was actually helpful or simplifying anything.
There was a problem hiding this comment.
I guess same question as above. Are you imaging something where I pass in a CountDownLatch as a callback or a future as a callback? At some point we need to block while waiting for the connections to be completed (without changing our architecture in a pretty major way).
|
|
||
| /** parse a hostname+port range spec into its equivalent addresses */ | ||
| /** | ||
| * parse a hostname+port range spec into its equivalent addresses |
There was a problem hiding this comment.
autoformat. I'll fix in next revision.
|
This is ready for another review. I have address @s1monw suggested changes. |
s1monw
left a comment
There was a problem hiding this comment.
this looks great! good stuff! I left some comments
|
|
||
| import java.io.IOException; | ||
|
|
||
| public interface TcpChannel<C extends TcpChannel<C>> { |
|
|
||
| public interface TcpChannel<C extends TcpChannel<C>> { | ||
|
|
||
| void closeAsync(); |
There was a problem hiding this comment.
I wonder if this can simply implement AutoCloseable and we document that is might be async and the close listeners will be called upon closing.
|
|
||
| @SuppressWarnings("unchecked") | ||
| private void internalAddListener(ActionListener<V> listener) { | ||
| listeners.offer(listener); |
There was a problem hiding this comment.
we have a race here. We might execute this listener twice. I think we should keep it simple and just use synchronize and make a fully copy of the list every time we add a listener. I don't think this is perf critical. Let's make the state only mutable under a mutex otherwise it's too complicated. That way you can also just use ActionListener#onResponse and ActionListener#onFailure
| import java.util.concurrent.TimeUnit; | ||
| import java.util.concurrent.TimeoutException; | ||
|
|
||
| public class TcpChannelUtils { |
There was a problem hiding this comment.
can we move these static methods on to TcpChannel I don't think we need this util class?
|
|
||
| public class TcpChannelUtils { | ||
|
|
||
| public static <C extends TcpChannel<C>> void closeChannel(C channel, boolean blocking) { |
There was a problem hiding this comment.
can we just delegate to closeChannels with a Collections#singletonList or can we even do it on the caller side?
| ensureOpen(); | ||
| try { | ||
| Exception connectionException = null; | ||
| int numConnections = connectionProfile.getNumConnections(); |
There was a problem hiding this comment.
can we assert that this is > 0?
| serverChannels.clear(); | ||
|
|
||
| // close all of the incoming channels | ||
| TcpChannelUtils.closeChannels(new ArrayList<>(acceptedChannels), true); |
There was a problem hiding this comment.
can you leave a comment why we copy this list / set
| } | ||
|
|
||
| protected void serverAcceptedChannel(Channel channel) { | ||
| if (acceptedChannels.add(channel)) { |
There was a problem hiding this comment.
I am not sure why we have this conditional. Do we deduplicate? do we have duplicates? I think we should not do this but rather make sure we assert on it or throw a hard exception? I also wonder if we should install the close listener first otherwise it might not be invoked in the case of a race?
There was a problem hiding this comment.
I agree that the channel should not have be attempted to be added twice. So I added an assertion.
I actually disagree on the other point. The only way to avoid a race is to add the close listener afterwards. Imagine that the channel is already closed. We add a close listener that removes the channel and it is immediately executed. Then we add the channel to the accepted set. Nothing removes it at this point.
Close listeners are always invoked once they are added. Even if they are added after a channel is closed (they are invoked immediately). So it is safe to add the close listener after the channel is added to the accepted set, because it will ensure that the channel is actually removed.
| pendingHandshakes.put(requestId, handler); | ||
| boolean success = false; | ||
| try { | ||
| if (isOpen(channel) == false) { |
There was a problem hiding this comment.
I love it! :) this is really much better
| import java.util.concurrent.atomic.AtomicBoolean; | ||
|
|
||
| public final class TcpTransportChannel<Channel> implements TransportChannel { | ||
| public final class TcpTransportChannel<Channel extends TcpChannel<Channel>> implements TransportChannel { |
There was a problem hiding this comment.
at some point we need to get rid of the generics here. I wonder why we still need it but this is a followup...
|
@s1monw I have made changes based on your feedback. I was able to remove some of the parameterization. But more should be able to be removed in followup PRs. I did disagree with this:
Can you articulate the race? I do not think there is one. This pattern is pretty common and I use it a number of places. I attempt to enqueue something with a concurrent queue. After enqueuing I check a volatile variable to see if this context has been completed. If it has not been completed this listener will be dealt with by the completing thread. If the context has been completed this thread will attempt to remove the listener. If the listener is removed, we deal with it and the completing thread will never see it. If the completing thread removes it from the queue first, it will deal with the listener and we can move on. Only one thread can remove it from the queue. Pretty much all of the complicated concurrency work is provided by the java std lib concurrent queue. Additionally, unless you execute all of the listeners in a synchronized block the logic is still pretty complicated using synchronization. (As an aside, after this PR we are not really using the PlainListenableActionFuture anymore. We could probably remove that as I think you prefer to not use the future api?) And you say that this is not a performance sensitive area of the code but this is called on the network thread. Using synchronization risks the network thread being parked while waiting for another thread to attach a listener. I would prefer to avoid that risk with using two java std lib utilities (atomic reference and concurrent queue) and a reasonably easy to understand concurrency pattern. I did add a comment to the code explaining what is going on. Thoughts? |
|
@s1monw Actually it looks like |
| listener.onResponse(tcpChannel); | ||
| } else { | ||
| if (throwable instanceof Exception) { | ||
| listener.onFailure((Exception) throwable); |
There was a problem hiding this comment.
I don't think we should hide the non-exception. this will be an error and in this case we need to rethrow it? @jasontedor WDYT?
| * @param <Response> the type of the response | ||
| * @return a listener that listens for responses and invokes the runnable when received | ||
| */ | ||
| static <Response> ActionListener<Response> wrap(Runnable runnable) { |
There was a problem hiding this comment.
maybe just return wrap(runnable:run, e -> runnable.run());?
you are right this is correct. I didn't anticipate / overlooked that poll removes. Yet, I like the solution you have now better :) |
jasontedor
left a comment
There was a problem hiding this comment.
@s1monw did the heavy lifting in the review phase here but this LGTM too.
Right now our different transport implementations must duplicate functionality in order to stay compliant with the requirements of TcpTransport. They must all implement common logic to open channels, close channels, keep track of channels for eventual shutdown, etc. Additionally, there is a weird and complicated relationship between Transport and TransportService. We eventually want to start merging some of the functionality between these classes. This commit starts moving towards a world where TransportService retains all the application logic and channel state. Transport implementations in this world will only be tasked with returning a channel when one is requested, calling transport service when a channel is accepted from a server, and starting / stopping itself. Specifically this commit changes how channels are opened and closed. All Transport implementations now return a channel type that must comply with the new TcpChannel interface. This interface has the methods necessary for TcpTransport to completely manage the lifecycle of a channel. This includes setting the channel up, waiting for connection, adding close listeners, and eventually closing.
This commit is a follow up to the work completed in elastic#27132. Essentially it transitions two more methods (sendMessage and getLocalAddress) from Transport to TcpChannel. With this change, there is no longer a need for TcpTransport to be aware of the specific type of channel a transport returns. So that class is no longer parameterized by channel type.
This is a follow up to elastic#27132. As that PR greatly simplified the connection logic inside a low level transport implementation, much of the functionality provided by the NioClient class is no longer necessary. This commit removes that class.
* master: Stop skipping REST test after backport of #27056 Fix default value of ignore_unavailable for snapshot REST API (#27056) Add composite aggregator (#26800) Fix `ShardSplittingQuery` to respect nested documents. (#27398) [Docs] Restore section about multi-level parent/child relation in parent-join (#27392) Add TcpChannel to unify Transport implementations (#27132) Add note on plugin distributions in plugins folder Remove implementations of `TransportChannel` (#27388) Update Google SDK to version 1.23 (#27381) Fix Gradle 4.3.1 compatibility for logging (#27382) [Test] Change Elasticsearch startup timeout to 120s in packaging tests Docs/windows installer (#27369)
This is a follow up to #27132. As that PR greatly simplified the connection logic inside a low level transport implementation, much of the functionality provided by the NioClient class is no longer necessary. This commit removes that class.
This is a follow up to #27132. As that PR greatly simplified the connection logic inside a low level transport implementation, much of the functionality provided by the NioClient class is no longer necessary. This commit removes that class.
This commit is a follow up to the work completed in #27132. Essentially it transitions two more methods (sendMessage and getLocalAddress) from Transport to TcpChannel. With this change, there is no longer a need for TcpTransport to be aware of the specific type of channel a transport returns. So that class is no longer parameterized by channel type.
This commit is a follow up to the work completed in #27132. Essentially it transitions two more methods (sendMessage and getLocalAddress) from Transport to TcpChannel. With this change, there is no longer a need for TcpTransport to be aware of the specific type of channel a transport returns. So that class is no longer parameterized by channel type.
Right now our different transport implementations must duplicate
functionality in order to stay compliant with the requirements of
TcpTransport. They must all implement common logic to open channels,
close channels, keep track of channels for eventual shutdown, etc.
Additionally, there is a weird and complicated relationship between
Transport and TransportService. We eventually want to start merging
some of the functionality between these classes.
This commit starts moving towards a world where TransportService retains
all the application logic and channel state. Transport implementations
in this world will only be tasked with returning a channel when one is
requested, calling transport service when a channel is accepted from
a server, and starting / stopping itself.
Specifically this commit changes how channels are opened and closed. All
Transport implementations now return a channel type that must comply with
the new TcpChannel interface. This interface has the methods necessary
for TcpTransport to completely manage the lifecycle of a channel. This
includes setting the channel up, waiting for connection, adding close
listeners, and eventually closing.