Conversation
s1monw
left a comment
There was a problem hiding this comment.
looks great, much simpler! I left a bunch of comments.
| import org.elasticsearch.cluster.ClusterState; | ||
| import org.elasticsearch.cluster.node.DiscoveryNode; | ||
| import org.elasticsearch.common.component.AbstractComponent; | ||
| import org.elasticsearch.common.inject.Inject; |
| () -> UNICAST_NODE_PREFIX + unicastNodeIdGenerator.incrementAndGet() + "#", | ||
| UNICAST_NODE_PREFIX, | ||
| resolveTimeout); | ||
| } catch (InterruptedException e) { |
There was a problem hiding this comment.
please restore the interrupt status here?
There was a problem hiding this comment.
this is how it was and we do throw an exception, thus processing the interrupt?
| final AbstractRunnable pingSender = new AbstractRunnable() { | ||
| @Override | ||
| public void onFailure(Exception e) { | ||
| if (e instanceof AlreadyClosedException) { |
There was a problem hiding this comment.
maybe (e instanceof AlreadyClosedException) == false?
There was a problem hiding this comment.
yep. This morphed - I used to have a log there but got annoyed with it (just noise).
| public PingCollection pingCollection() { | ||
| return pingCollection; | ||
| public List<DiscoveryNode> getSeedNodes() { | ||
| checkIfClosed(); |
There was a problem hiding this comment.
we call it ensureOpen everywhere can we do the same here?
There was a problem hiding this comment.
good one. Will change.
| } | ||
| } | ||
|
|
||
| public synchronized Connection addConnectionIfNeeded(TransportAddress address, Connection newConnection) { |
There was a problem hiding this comment.
hmm that looks weird. Can we maybe use a KeyedLock when we open the connections with IP and port or something like this?
There was a problem hiding this comment.
yeah, I wanted to have the simplest construct as it was a rare collision. With the latest code I actually think it's impossible (I dedup on addresses and the connection are private to the pinging round). Will remove.
There was a problem hiding this comment.
turns out we do need this protection or something similar. I took another approach, which I think you'd like better.
| } | ||
|
|
||
| if (connection == null) { | ||
| logger.trace("[{}] connecting (light) to {}", pingingRound.id(), node); |
There was a problem hiding this comment.
do we need this trace log here and if so can we fix it to say temporarily or something like this
There was a problem hiding this comment.
I adapted the log message
| } finally { | ||
| latch.countDown(); | ||
| logger.trace("[{}] received response from {}: {}", pingingRound.id(), node, Arrays.toString(response.pingResponses)); | ||
| if (pingingRound.isClosed() == false) { |
There was a problem hiding this comment.
just flip it then you don't need to negate
| } | ||
| } finally { | ||
| latch.countDown(); | ||
| logger.trace("[{}] received response from {}: {}", pingingRound.id(), node, Arrays.toString(response.pingResponses)); |
There was a problem hiding this comment.
if you keep the trace maybe use a logging guard here?
There was a problem hiding this comment.
sure thing, will add.
| */ | ||
| public static ConnectionProfile getLightProfileWithTimeout(@Nullable TimeValue connectTimeout, | ||
| @Nullable TimeValue handshakeTimeout) { | ||
| return new ConnectionProfile( |
There was a problem hiding this comment.
I wonder if we should do this. I think we should move the LIGHT_PROFILE into tests somewhere and then require every special use to build it's own. The problem I have here is that the getLightProfileWithTimeout shares one connection across all uses. I think in the case of ping we should only use 1 connection for PING and 0 for the others. that will cause an exception if it's used in a wrong context. makes sense?
There was a problem hiding this comment.
I tried to implement your suggestion and I think it looks good. will push shortly.
| * @return the connected node | ||
| * @throws ConnectTransportException if the connection failed | ||
| * @throws IllegalStateException if the handshake failed | ||
| */ |
|
@s1monw I pushed more commits addressing your feedback. Let me know what you think. |
| import static org.mockito.Mockito.verify; | ||
| import static org.mockito.Mockito.verifyNoMoreInteractions; | ||
|
|
||
| @TestLogging("org.elasticsearch.transport:TRACE,org.elasticsearch.discovery.zen:TRACE") |
There was a problem hiding this comment.
This logging was initially added to just testSimplePings to chase a race. The race has not reproduced since adding this logging. I think that we should drop the logging and and then address if the race comes back since you've changed how these things are handled.
s1monw
left a comment
There was a problem hiding this comment.
left some minors LGTM otherwise
| public Connection getOrConnect(DiscoveryNode node) throws IOException { | ||
| Connection result; | ||
| try (Releasable ignore = connectionLock.acquire(node.getAddress())) { | ||
| result = tempConnections.get(node.getAddress()); |
There was a problem hiding this comment.
maybe use computeIfAbsent()?
There was a problem hiding this comment.
the problem is the IOException that can be thrown while making a connection.
| final Map<TransportAddress, DiscoveryNode> uniqueNodesByAddress = | ||
| Stream.concat(pingingRound.getSeedNodes().stream(), nodesFromResponses.stream()) | ||
| .collect(Collectors.toMap(DiscoveryNode::getAddress, n -> n, (n1, n2) -> n1)); | ||
| .collect(Collectors.toMap(DiscoveryNode::getAddress, node -> node, (n1, n2) -> n1)); |
There was a problem hiding this comment.
you didn't like Function.identity() ?
There was a problem hiding this comment.
I did but I used it wrong (as a function reference). Using it right works of course .. zzzz
| } | ||
| } else { | ||
| logger.trace("[{}] skipping received response from {}. already closed", pingingRound.id(), node); | ||
| Arrays.asList(response.pingResponses).forEach(pingingRound::addPingResponseToCollection); |
There was a problem hiding this comment.
Arrays.asStream(response.pingResponses) would not materialize it
43f8287 to
8959ae6
Compare
|
thx @s1monw. I'll wait a day before backporting |
* master: Simplify Unicast Zen Ping (elastic#22277) Replace IndicesQueriesRegistry (elastic#22289) Fixed document mistake and fit for 5.1.1 API [TEST] improve error message in ESTestCase#assertWarnings [TEST] remove deleted test classes from checkstyle suppressions [TEST] make ESSingleNodeTestCase tests repeatable (elastic#22283) Link for setting page in elasticsearch.yml is outdated Factor out sort values from InternalSearchHit (elastic#22080) Add ID for percolate query to Java API docs x_refresh.yaml tests should use unique index names and doc ids to ease debugging IndicesStoreIntegrationIT should not use start recovery sending as an indication that the recovery started Added base class for testing aggregators and some initial tests for `terms`, `top_hits` and `min` aggregations. Add link to foreach processor to ingest-attachment.asciidoc
…r being closed This may cause them to leak. Provisioning for it was made in #22277 but sadly a crucial ensureOpen call was forgotten
…otification task
Not doing this made it difficult to establish a happens before relationship between connecting to a node and adding a listeners. Causing test code like this to fail sproadically:
```
// connection to reuse
handleA.transportService.connectToNode(handleB.node);
// install a listener to check that no new connections are made
handleA.transportService.addConnectionListener(new TransportConnectionListener() {
@OverRide
public void onConnectionOpened(DiscoveryNode node) {
fail("should not open any connections. got [" + node + "]");
}
});
```
relates to #22277
The `UnicastZenPing` shows it's age and is the result of many small changes. The current state of affairs is confusing and is hard to reason about. This PR cleans it up (while following the same original intentions). Highlights of the changes are: 1) Clear 3 round flow - no interleaving of scheduling. 2) The previous implementation did a best effort attempt to wait for ongoing pings to be sent and completed. The pings were guaranteed to complete because each used the total ping duration as a timeout. This did make it hard to reason about the total ping duration and the flow of the code. All of this is removed now and ping should just complete within the given duration or not be counted (note that it was very handy for testing, but I move the needed sync logic to the test). 3) Because of (2) the pinging scheduling changed a bit, to give a chance for the last round to complete. We now ping at the beginning, 1/3 and 2/3 of the duration. 4) To offset for (3) a bit, incoming ping requests are now added to on going ping collections. 5) UnicastZenPing never establishes full blown connections (but does reuse them if there). Relates to #22120 6) Discovery host providers are only used once per pinging round. Closes #21739 7) Usage of the ability to open a connection without connecting to a node ( #22194 ) and shorter connection timeouts helps with connections piling up. Closes #19370 8) Beefed up testing and sped them up. 9) removed light profile from production code
…r being closed This may cause them to leak. Provisioning for it was made in #22277 but sadly a crucial ensureOpen call was forgotten
…otification task
Not doing this made it difficult to establish a happens before relationship between connecting to a node and adding a listeners. Causing test code like this to fail sproadically:
```
// connection to reuse
handleA.transportService.connectToNode(handleB.node);
// install a listener to check that no new connections are made
handleA.transportService.addConnectionListener(new TransportConnectionListener() {
@OverRide
public void onConnectionOpened(DiscoveryNode node) {
fail("should not open any connections. got [" + node + "]");
}
});
```
relates to #22277
|
this is now backported to 5.x as well. |
The
UnicastZenPingshows it's age and is the result of many small changes. The current state of affairs is confusing and is hard to reason about. This PR cleans it up (while following the same original intentions). Highlights of the changes are: