Skip to content

Register data node stats from info carried back in search responses#25430

Merged
dakrone merged 29 commits intoelastic:masterfrom
dakrone:register-adaptive-selection-info
Jul 17, 2017
Merged

Register data node stats from info carried back in search responses#25430
dakrone merged 29 commits intoelastic:masterfrom
dakrone:register-adaptive-selection-info

Conversation

@dakrone
Copy link
Copy Markdown
Member

@dakrone dakrone commented Jun 27, 2017

This is part of #24915, where we now calculate the EWMA of service time for
tasks in the search threadpool, and send that as well as the current queue size
back to the coordinating node. The coordinating node now tracks this information
for each node in the cluster.

This information will be used in the future the determining the best replica a
search request should be routed to. This change has no user-visible difference.

This is part of elastic#24915, where we now calculate the EWMA of service time for
tasks in the search threadpool, and send that as well as the current queue size
back to the coordinating node. The coordinating node now tracks this information
for each node in the cluster.

This information will be used in the future the determining the best replica a
search request should be routed to. This change has no user-visible difference.
@dakrone
Copy link
Copy Markdown
Member Author

dakrone commented Jun 27, 2017

retest this please

Copy link
Copy Markdown
Contributor

@s1monw s1monw left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

left a bunch of comments, thanks lee for the effort!

successfulOps.incrementAndGet();
long responseDuration = System.nanoTime() - startNanos;
String nodeId = result.getSearchShardTarget().getNodeId();
if (nodeId != null && collectorService != null) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why can any of this be null? this looks like a bug?!

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why do we do this in here and not in the query wrapper. This action might run some DFS stats rather than queries so I think we should capture it solely in the ResponseListenerWrapper?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if we only capture it in the wrapper we don't need the dependency here no?

* result to get the piggybacked queue size and service time EWMA, adding those
* values to the coordinating nodes' {@code ResponseCollectorService}.
*/
public class ResponseListenerWrapper extends SearchActionListener<SearchPhaseResult> {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this one can just be an action listener, we can use the searchShardTarget from the delegate listener. there is no need to set the targets twice etc.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I also wonder if we can find a better name for this

*/
public class ResponseListenerWrapper extends SearchActionListener<SearchPhaseResult> {

private static final Logger logger = ESLoggerFactory.getLogger(ResponseListenerWrapper.class);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let's not add logger here. if it's only for debugging we can add it when it's needed

if (queryResult != null && collector != null) {
long ewma = queryResult.serviceTimeEWMA();
int queueSize = queryResult.nodeQueueSize();
String nodeId = listener.searchShardTarget.getNodeId();
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this should never be null. Fail if it is, maybe in the ctor already


public ResponseListenerWrapper(SearchActionListener<SearchPhaseResult> listener,
ResponseCollectorService collector) {
super(listener.searchShardTarget, listener.requestIndex);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe we just expect an Actionlistener and a node ID string or even better a DiscoveryNode here. We can get it directly from the connection via Connection#getNode() which works with CCS as well then we can reduce visibility again on SearchActionListener

private final int requestIndex;
private final SearchShardTarget searchShardTarget;

protected final int requestIndex;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

see comment above regarding visibility

transportService, indicesService, pluginsService, circuitBreakerService, scriptModule.getScriptService(),
httpServerTransport, ingestService, clusterService, settingsModule.getSettingsFilter());
modules.add(b -> {
b.bind(ResponseCollectorService.class).toInstance(responseCollectorService);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think we should do this! Let's please don't add any guice related stuff anymore no matter what it costs.


private static final double ALPHA = 0.3;

private final Map<String, ExponentiallyWeightedMovingAverage> nodeIdToQueueSize;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we maybe just have a single map and then sync on the node ID with using some KeyedLock? I think that would be simpler?

this.queryResult = new QuerySearchResult(id, shardTarget);
this.fetchResult = new FetchSearchResult(id, shardTarget);
this.indexShard = indexShard;
this.searchExecutor = indexShard.getThreadPool().executor(ThreadPool.Names.SEARCH);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we do this on the consumer end and specialize the code in SearchService? I think this is where it's used. We shouldn't really add methods to this class it's big enough

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think I understand, I need a way from QueryPhase.execute to access the executor to get the stats out of it, I can't do it on the consumer end since that may be on a different node, and the only thing available in the (static) QueryPhase.execute method is the SearchContext.

Is there a better place to put this?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what I mean is instead of having the searchExecutor member can't we do this in QueryPhase.execute instead so we don't add more than necessary to the search context. it's already huge and we can access indexShard?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That makes sense and is much cleaner, thanks!

* Test that response stats are collected for queries executed on a real node
*/
@ESIntegTestCase.ClusterScope(scope = ESIntegTestCase.Scope.TEST, numDataNodes = 0)
public class ResponseCollectorServiceIT extends ESIntegTestCase {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

instead of doing these old style integration tests should we rather add unittest to do this. I added so many for all the search actions. Let's test all the things in isolation like the wrapper, the query phase etc.

@dakrone
Copy link
Copy Markdown
Member Author

dakrone commented Jul 11, 2017

retest this please

@dakrone
Copy link
Copy Markdown
Member Author

dakrone commented Jul 12, 2017

@s1monw thanks for taking a look! I've pushed commits addressing all of your feedback (except for the one I left a question about), could you take another look?

@dakrone dakrone requested a review from s1monw July 12, 2017 17:07
Copy link
Copy Markdown
Contributor

@s1monw s1monw left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

left some minors looks close!


private final int requestIndex;
private final SearchShardTarget searchShardTarget;
protected final SearchShardTarget searchShardTarget;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you add a getter instead of making this protected?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

given my comment below I think this is unneeded and can be private without a getter

private final ResponseCollectorService collector;
private final long startNanos;

public SearchExecutionStatsCollector(SearchActionListener<SearchPhaseResult> listener,
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please instead of exposing the searchShardTarget from SearchActionListener can you add a 3rd arg that is the nodeID. we can get it from the passed in connection we talk to instead. then SearchActionListener can also be a ActionListener instead

* result to get the piggybacked queue size and service time EWMA, adding those
* values to the coordinating nodes' {@code ResponseCollectorService}.
*/
public class SearchExecutionStatsCollector implements ActionListener<SearchPhaseResult> {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can this be pkg privaate and final?


public SearchTransportService(Settings settings, TransportService transportService) {
public SearchTransportService(Settings settings, TransportService transportService,
ResponseCollectorService responseCollectorService) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you can also remove the dependency by passing a BiFunction<Connection, SearchActionListener, ActionListener> instead of ResponseCollectorService and require it to be non-null then it's easier to test? and we never need to do nullchecks

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I played with this, however, it requires making SearchExecutionStatsCollector public instead of package private, since we need to construct it within the BiFunction passed in to SearchTransportService in Node.

I think this is what you had in mind, but let me know if I'm mistaken:

diff --git a/core/src/main/java/org/elasticsearch/action/search/SearchExecutionStatsCollector.java b/core/src/main/java/org/elasticsearch/action/search/SearchExecutionStatsCollector.java                                                   
index 031e860..96d5e18 100644
--- a/core/src/main/java/org/elasticsearch/action/search/SearchExecutionStatsCollector.java
+++ b/core/src/main/java/org/elasticsearch/action/search/SearchExecutionStatsCollector.java
@@ -23,8 +23,10 @@ import org.elasticsearch.action.ActionListener;
 import org.elasticsearch.node.ResponseCollectorService;
 import org.elasticsearch.search.SearchPhaseResult;
 import org.elasticsearch.search.query.QuerySearchResult;
+import org.elasticsearch.transport.Transport;
 
 import java.util.Objects;
+import java.util.function.BiFunction;
 
 /**
  * A wrapper of search action listeners (search results) that unwraps the query
@@ -47,6 +49,10 @@ final class SearchExecutionStatsCollector implements ActionListener<SearchPhaseR
         this.nodeId = nodeId;
     }
 
+    public static BiFunction<Transport.Connection, SearchActionListener, ActionListener> makeWrapper(ResponseCollectorService service) {                                                                                                    
+        return (connection, originalListener) -> new SearchExecutionStatsCollector(listener, service, connection.getNode().getId());                                                                                                        
+    }
+
     @Override
     public void onResponse(SearchPhaseResult response) {
         QuerySearchResult queryResult = response.queryResult();
diff --git a/core/src/main/java/org/elasticsearch/action/search/SearchTransportService.java b/core/src/main/java/org/elasticsearch/action/search/SearchTransportService.java                                                                 
index 2587623..3ad1517 100644
--- a/core/src/main/java/org/elasticsearch/action/search/SearchTransportService.java
+++ b/core/src/main/java/org/elasticsearch/action/search/SearchTransportService.java
@@ -56,6 +56,7 @@ import org.elasticsearch.transport.TransportResponse;
 import org.elasticsearch.transport.TransportService;
 
 import java.io.IOException;
+import java.util.function.BiFunction;
 import java.util.function.Supplier;
 
 /**
@@ -77,12 +78,13 @@ public class SearchTransportService extends AbstractComponent {
 
     private final TransportService transportService;
     private final ResponseCollectorService responseCollectorService;
+    private final BiFunction<Transport.Connection, SearchActionListener, ActionListener> responseWrapper;
 
     public SearchTransportService(Settings settings, TransportService transportService,
-                                  ResponseCollectorService responseCollectorService) {
+                                  BiFunction<Transport.Connection, SearchActionListener, ActionListener> responseWrapper) {                                                                                                                 
         super(settings);
         this.transportService = transportService;
-        this.responseCollectorService = responseCollectorService;
+        this.responseWrapper = responseWrapper;
     }
 
     public void sendFreeContext(Transport.Connection connection, final long contextId, OriginalIndices originalIndices) {                                                                                                                   
@@ -123,12 +125,7 @@ public class SearchTransportService extends AbstractComponent {
         final boolean fetchDocuments = request.numberOfShards() == 1;
         Supplier<SearchPhaseResult> supplier = fetchDocuments ? QueryFetchSearchResult::new : QuerySearchResult::new;
 
-        final ActionListener handler;
-        if (responseCollectorService == null) {
-            handler = listener;
-        } else {
-            handler = new SearchExecutionStatsCollector(listener, responseCollectorService, connection.getNode().getId());                                                                                                                  
-        }
+        final ActionListener handler = responseWrapper.apply(connection, listener);
         transportService.sendChildRequest(connection, QUERY_ACTION_NAME, request, task,
             new ActionListenerResponseHandler<>(handler, supplier));
     }
diff --git a/core/src/main/java/org/elasticsearch/node/Node.java b/core/src/main/java/org/elasticsearch/node/Node.java
index bb2264f..b10f645 100644
--- a/core/src/main/java/org/elasticsearch/node/Node.java
+++ b/core/src/main/java/org/elasticsearch/node/Node.java
@@ -29,6 +29,7 @@ import org.elasticsearch.ElasticsearchTimeoutException;
 import org.elasticsearch.Version;
 import org.elasticsearch.action.ActionModule;
 import org.elasticsearch.action.GenericAction;
+import org.elasticsearch.action.search.SearchExecutionStatsCollector;
 import org.elasticsearch.action.search.SearchPhaseController;
 import org.elasticsearch.action.search.SearchTransportService;
 import org.elasticsearch.action.support.TransportAction;
@@ -424,8 +425,8 @@ public class Node implements Closeable {
             final TransportService transportService = newTransportService(settings, transport, threadPool,
                 networkModule.getTransportInterceptor(), localNodeFactory, settingsModule.getClusterSettings());
             final ResponseCollectorService responseCollectorService = new ResponseCollectorService(this.settings, clusterService);                                                                                                          
-            final SearchTransportService searchTransportService =  new SearchTransportService(settings,
-                transportService, responseCollectorService);
+            final SearchTransportService searchTransportService =  new SearchTransportService(settings, transportService,                                                                                                                   
+                    SearchExecutionStatsCollector.makeWrapper(responseCollectorService));
             final Consumer<Binder> httpBind;
             final HttpServerTransport httpServerTransport;
             if (networkModule.isHttpEnabled()) {

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes this looks good to me

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I played with this, however, it requires making SearchExecutionStatsCollector public instead of package private, since we need to construct it within the BiFunction passed in to SearchTransportService in Node.

I think that is ok!

* tasks executed on each node, making the EWMA of the values available to the
* coordinating node.
*/
public class ResponseCollectorService extends AbstractComponent implements ClusterStateListener {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

final?

});
}

public Map<String, ComputedNodeStats> getAllNodeStatistics() {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this seems to be test only can we move it into a util in tests?

Copy link
Copy Markdown
Member Author

@dakrone dakrone Jul 13, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm planning on using this method for the very next PR that implements the actual ranking algorithm, without this method the NodeStatistics values would be exposed, which is the mutable one. By putting this method here NodeStatistics can be private and we can expose only the immutable ComputedNodeStats

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

++

* node's statistics. This includes the EWMA of queue size, response time,
* and service time.
*/
public class ComputedNodeStats {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can this inner class be static and final and private

* time, and service time, however, this class is private and intended only
* to be used for the internal accounting of {@code ResponseCollectorService}.
*/
private class NodeStatistics {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should also be static?

* to be used for the internal accounting of {@code ResponseCollectorService}.
*/
private class NodeStatistics {
public final String nodeId;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

public is obsolete here

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

serviceTime is non final?! is that expected? just curious...

@dakrone
Copy link
Copy Markdown
Member Author

dakrone commented Jul 13, 2017

Thanks for the comments @s1monw! I added more changes based on your feedback

@dakrone dakrone requested a review from s1monw July 13, 2017 19:15
Copy link
Copy Markdown
Contributor

@s1monw s1monw left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please apply your proposed patch other than that LGTM

@dakrone dakrone merged commit 610ba7e into elastic:master Jul 17, 2017
@dakrone
Copy link
Copy Markdown
Member Author

dakrone commented Jul 17, 2017

Thanks for all the reviews @s1monw!

@dakrone dakrone added :Search/Search Search-related issues that do not fall into other categories v6.0.0 labels Jul 24, 2017
@dakrone dakrone deleted the register-adaptive-selection-info branch December 13, 2017 20:37
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

:Search/Search Search-related issues that do not fall into other categories v6.0.0-beta1

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants