Register data node stats from info carried back in search responses#25430
Register data node stats from info carried back in search responses#25430dakrone merged 29 commits intoelastic:masterfrom
Conversation
This is part of elastic#24915, where we now calculate the EWMA of service time for tasks in the search threadpool, and send that as well as the current queue size back to the coordinating node. The coordinating node now tracks this information for each node in the cluster. This information will be used in the future the determining the best replica a search request should be routed to. This change has no user-visible difference.
|
retest this please |
s1monw
left a comment
There was a problem hiding this comment.
left a bunch of comments, thanks lee for the effort!
| successfulOps.incrementAndGet(); | ||
| long responseDuration = System.nanoTime() - startNanos; | ||
| String nodeId = result.getSearchShardTarget().getNodeId(); | ||
| if (nodeId != null && collectorService != null) { |
There was a problem hiding this comment.
why can any of this be null? this looks like a bug?!
There was a problem hiding this comment.
why do we do this in here and not in the query wrapper. This action might run some DFS stats rather than queries so I think we should capture it solely in the ResponseListenerWrapper?
There was a problem hiding this comment.
if we only capture it in the wrapper we don't need the dependency here no?
| * result to get the piggybacked queue size and service time EWMA, adding those | ||
| * values to the coordinating nodes' {@code ResponseCollectorService}. | ||
| */ | ||
| public class ResponseListenerWrapper extends SearchActionListener<SearchPhaseResult> { |
There was a problem hiding this comment.
I think this one can just be an action listener, we can use the searchShardTarget from the delegate listener. there is no need to set the targets twice etc.
There was a problem hiding this comment.
I also wonder if we can find a better name for this
| */ | ||
| public class ResponseListenerWrapper extends SearchActionListener<SearchPhaseResult> { | ||
|
|
||
| private static final Logger logger = ESLoggerFactory.getLogger(ResponseListenerWrapper.class); |
There was a problem hiding this comment.
let's not add logger here. if it's only for debugging we can add it when it's needed
| if (queryResult != null && collector != null) { | ||
| long ewma = queryResult.serviceTimeEWMA(); | ||
| int queueSize = queryResult.nodeQueueSize(); | ||
| String nodeId = listener.searchShardTarget.getNodeId(); |
There was a problem hiding this comment.
this should never be null. Fail if it is, maybe in the ctor already
|
|
||
| public ResponseListenerWrapper(SearchActionListener<SearchPhaseResult> listener, | ||
| ResponseCollectorService collector) { | ||
| super(listener.searchShardTarget, listener.requestIndex); |
There was a problem hiding this comment.
maybe we just expect an Actionlistener and a node ID string or even better a DiscoveryNode here. We can get it directly from the connection via Connection#getNode() which works with CCS as well then we can reduce visibility again on SearchActionListener
| private final int requestIndex; | ||
| private final SearchShardTarget searchShardTarget; | ||
|
|
||
| protected final int requestIndex; |
There was a problem hiding this comment.
see comment above regarding visibility
| transportService, indicesService, pluginsService, circuitBreakerService, scriptModule.getScriptService(), | ||
| httpServerTransport, ingestService, clusterService, settingsModule.getSettingsFilter()); | ||
| modules.add(b -> { | ||
| b.bind(ResponseCollectorService.class).toInstance(responseCollectorService); |
There was a problem hiding this comment.
I don't think we should do this! Let's please don't add any guice related stuff anymore no matter what it costs.
|
|
||
| private static final double ALPHA = 0.3; | ||
|
|
||
| private final Map<String, ExponentiallyWeightedMovingAverage> nodeIdToQueueSize; |
There was a problem hiding this comment.
Can we maybe just have a single map and then sync on the node ID with using some KeyedLock? I think that would be simpler?
| this.queryResult = new QuerySearchResult(id, shardTarget); | ||
| this.fetchResult = new FetchSearchResult(id, shardTarget); | ||
| this.indexShard = indexShard; | ||
| this.searchExecutor = indexShard.getThreadPool().executor(ThreadPool.Names.SEARCH); |
There was a problem hiding this comment.
can we do this on the consumer end and specialize the code in SearchService? I think this is where it's used. We shouldn't really add methods to this class it's big enough
There was a problem hiding this comment.
I don't think I understand, I need a way from QueryPhase.execute to access the executor to get the stats out of it, I can't do it on the consumer end since that may be on a different node, and the only thing available in the (static) QueryPhase.execute method is the SearchContext.
Is there a better place to put this?
There was a problem hiding this comment.
what I mean is instead of having the searchExecutor member can't we do this in QueryPhase.execute instead so we don't add more than necessary to the search context. it's already huge and we can access indexShard?
There was a problem hiding this comment.
That makes sense and is much cleaner, thanks!
| * Test that response stats are collected for queries executed on a real node | ||
| */ | ||
| @ESIntegTestCase.ClusterScope(scope = ESIntegTestCase.Scope.TEST, numDataNodes = 0) | ||
| public class ResponseCollectorServiceIT extends ESIntegTestCase { |
There was a problem hiding this comment.
instead of doing these old style integration tests should we rather add unittest to do this. I added so many for all the search actions. Let's test all the things in isolation like the wrapper, the query phase etc.
…nListener Also removes the logger
|
retest this please |
|
@s1monw thanks for taking a look! I've pushed commits addressing all of your feedback (except for the one I left a question about), could you take another look? |
s1monw
left a comment
There was a problem hiding this comment.
left some minors looks close!
|
|
||
| private final int requestIndex; | ||
| private final SearchShardTarget searchShardTarget; | ||
| protected final SearchShardTarget searchShardTarget; |
There was a problem hiding this comment.
can you add a getter instead of making this protected?
There was a problem hiding this comment.
given my comment below I think this is unneeded and can be private without a getter
| private final ResponseCollectorService collector; | ||
| private final long startNanos; | ||
|
|
||
| public SearchExecutionStatsCollector(SearchActionListener<SearchPhaseResult> listener, |
There was a problem hiding this comment.
please instead of exposing the searchShardTarget from SearchActionListener can you add a 3rd arg that is the nodeID. we can get it from the passed in connection we talk to instead. then SearchActionListener can also be a ActionListener instead
| * result to get the piggybacked queue size and service time EWMA, adding those | ||
| * values to the coordinating nodes' {@code ResponseCollectorService}. | ||
| */ | ||
| public class SearchExecutionStatsCollector implements ActionListener<SearchPhaseResult> { |
There was a problem hiding this comment.
can this be pkg privaate and final?
|
|
||
| public SearchTransportService(Settings settings, TransportService transportService) { | ||
| public SearchTransportService(Settings settings, TransportService transportService, | ||
| ResponseCollectorService responseCollectorService) { |
There was a problem hiding this comment.
you can also remove the dependency by passing a BiFunction<Connection, SearchActionListener, ActionListener> instead of ResponseCollectorService and require it to be non-null then it's easier to test? and we never need to do nullchecks
There was a problem hiding this comment.
I played with this, however, it requires making SearchExecutionStatsCollector public instead of package private, since we need to construct it within the BiFunction passed in to SearchTransportService in Node.
I think this is what you had in mind, but let me know if I'm mistaken:
diff --git a/core/src/main/java/org/elasticsearch/action/search/SearchExecutionStatsCollector.java b/core/src/main/java/org/elasticsearch/action/search/SearchExecutionStatsCollector.java
index 031e860..96d5e18 100644
--- a/core/src/main/java/org/elasticsearch/action/search/SearchExecutionStatsCollector.java
+++ b/core/src/main/java/org/elasticsearch/action/search/SearchExecutionStatsCollector.java
@@ -23,8 +23,10 @@ import org.elasticsearch.action.ActionListener;
import org.elasticsearch.node.ResponseCollectorService;
import org.elasticsearch.search.SearchPhaseResult;
import org.elasticsearch.search.query.QuerySearchResult;
+import org.elasticsearch.transport.Transport;
import java.util.Objects;
+import java.util.function.BiFunction;
/**
* A wrapper of search action listeners (search results) that unwraps the query
@@ -47,6 +49,10 @@ final class SearchExecutionStatsCollector implements ActionListener<SearchPhaseR
this.nodeId = nodeId;
}
+ public static BiFunction<Transport.Connection, SearchActionListener, ActionListener> makeWrapper(ResponseCollectorService service) {
+ return (connection, originalListener) -> new SearchExecutionStatsCollector(listener, service, connection.getNode().getId());
+ }
+
@Override
public void onResponse(SearchPhaseResult response) {
QuerySearchResult queryResult = response.queryResult();
diff --git a/core/src/main/java/org/elasticsearch/action/search/SearchTransportService.java b/core/src/main/java/org/elasticsearch/action/search/SearchTransportService.java
index 2587623..3ad1517 100644
--- a/core/src/main/java/org/elasticsearch/action/search/SearchTransportService.java
+++ b/core/src/main/java/org/elasticsearch/action/search/SearchTransportService.java
@@ -56,6 +56,7 @@ import org.elasticsearch.transport.TransportResponse;
import org.elasticsearch.transport.TransportService;
import java.io.IOException;
+import java.util.function.BiFunction;
import java.util.function.Supplier;
/**
@@ -77,12 +78,13 @@ public class SearchTransportService extends AbstractComponent {
private final TransportService transportService;
private final ResponseCollectorService responseCollectorService;
+ private final BiFunction<Transport.Connection, SearchActionListener, ActionListener> responseWrapper;
public SearchTransportService(Settings settings, TransportService transportService,
- ResponseCollectorService responseCollectorService) {
+ BiFunction<Transport.Connection, SearchActionListener, ActionListener> responseWrapper) {
super(settings);
this.transportService = transportService;
- this.responseCollectorService = responseCollectorService;
+ this.responseWrapper = responseWrapper;
}
public void sendFreeContext(Transport.Connection connection, final long contextId, OriginalIndices originalIndices) {
@@ -123,12 +125,7 @@ public class SearchTransportService extends AbstractComponent {
final boolean fetchDocuments = request.numberOfShards() == 1;
Supplier<SearchPhaseResult> supplier = fetchDocuments ? QueryFetchSearchResult::new : QuerySearchResult::new;
- final ActionListener handler;
- if (responseCollectorService == null) {
- handler = listener;
- } else {
- handler = new SearchExecutionStatsCollector(listener, responseCollectorService, connection.getNode().getId());
- }
+ final ActionListener handler = responseWrapper.apply(connection, listener);
transportService.sendChildRequest(connection, QUERY_ACTION_NAME, request, task,
new ActionListenerResponseHandler<>(handler, supplier));
}
diff --git a/core/src/main/java/org/elasticsearch/node/Node.java b/core/src/main/java/org/elasticsearch/node/Node.java
index bb2264f..b10f645 100644
--- a/core/src/main/java/org/elasticsearch/node/Node.java
+++ b/core/src/main/java/org/elasticsearch/node/Node.java
@@ -29,6 +29,7 @@ import org.elasticsearch.ElasticsearchTimeoutException;
import org.elasticsearch.Version;
import org.elasticsearch.action.ActionModule;
import org.elasticsearch.action.GenericAction;
+import org.elasticsearch.action.search.SearchExecutionStatsCollector;
import org.elasticsearch.action.search.SearchPhaseController;
import org.elasticsearch.action.search.SearchTransportService;
import org.elasticsearch.action.support.TransportAction;
@@ -424,8 +425,8 @@ public class Node implements Closeable {
final TransportService transportService = newTransportService(settings, transport, threadPool,
networkModule.getTransportInterceptor(), localNodeFactory, settingsModule.getClusterSettings());
final ResponseCollectorService responseCollectorService = new ResponseCollectorService(this.settings, clusterService);
- final SearchTransportService searchTransportService = new SearchTransportService(settings,
- transportService, responseCollectorService);
+ final SearchTransportService searchTransportService = new SearchTransportService(settings, transportService,
+ SearchExecutionStatsCollector.makeWrapper(responseCollectorService));
final Consumer<Binder> httpBind;
final HttpServerTransport httpServerTransport;
if (networkModule.isHttpEnabled()) {There was a problem hiding this comment.
I played with this, however, it requires making SearchExecutionStatsCollector public instead of package private, since we need to construct it within the BiFunction passed in to SearchTransportService in Node.
I think that is ok!
| * tasks executed on each node, making the EWMA of the values available to the | ||
| * coordinating node. | ||
| */ | ||
| public class ResponseCollectorService extends AbstractComponent implements ClusterStateListener { |
| }); | ||
| } | ||
|
|
||
| public Map<String, ComputedNodeStats> getAllNodeStatistics() { |
There was a problem hiding this comment.
this seems to be test only can we move it into a util in tests?
There was a problem hiding this comment.
I'm planning on using this method for the very next PR that implements the actual ranking algorithm, without this method the NodeStatistics values would be exposed, which is the mutable one. By putting this method here NodeStatistics can be private and we can expose only the immutable ComputedNodeStats
| * node's statistics. This includes the EWMA of queue size, response time, | ||
| * and service time. | ||
| */ | ||
| public class ComputedNodeStats { |
There was a problem hiding this comment.
can this inner class be static and final and private
| * time, and service time, however, this class is private and intended only | ||
| * to be used for the internal accounting of {@code ResponseCollectorService}. | ||
| */ | ||
| private class NodeStatistics { |
| * to be used for the internal accounting of {@code ResponseCollectorService}. | ||
| */ | ||
| private class NodeStatistics { | ||
| public final String nodeId; |
There was a problem hiding this comment.
serviceTime is non final?! is that expected? just curious...
|
Thanks for the comments @s1monw! I added more changes based on your feedback |
s1monw
left a comment
There was a problem hiding this comment.
please apply your proposed patch other than that LGTM
|
Thanks for all the reviews @s1monw! |
This is part of #24915, where we now calculate the EWMA of service time for
tasks in the search threadpool, and send that as well as the current queue size
back to the coordinating node. The coordinating node now tracks this information
for each node in the cluster.
This information will be used in the future the determining the best replica a
search request should be routed to. This change has no user-visible difference.