Skip to content

Commit e3386e1

Browse files
authored
Add minimum compatibility version to SearchRequest (elastic#65896)
* Adds a minimum version request parameter to SearchRequest. The minimum version helps failing a request if any shards involved in the search do not meet the compatibility requirements (all shards need to have a version equal or later than the minimum version provided).
1 parent 132f30d commit e3386e1

11 files changed

Lines changed: 618 additions & 16 deletions

File tree

qa/mixed-cluster/src/test/java/org/elasticsearch/backwards/IndexingIT.java

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -400,7 +400,11 @@ private List<Shard> buildShards(String index, Nodes nodes, RestClient client) th
400400
}
401401

402402
private Nodes buildNodeAndVersions() throws IOException {
403-
Response response = client().performRequest(new Request("GET", "_nodes"));
403+
return buildNodeAndVersions(client());
404+
}
405+
406+
static Nodes buildNodeAndVersions(RestClient client) throws IOException {
407+
Response response = client.performRequest(new Request("GET", "_nodes"));
404408
ObjectPath objectPath = ObjectPath.createFromResponse(response);
405409
Map<String, Object> nodesAsMap = objectPath.evaluate("nodes");
406410
Nodes nodes = new Nodes();
@@ -411,12 +415,12 @@ private Nodes buildNodeAndVersions() throws IOException {
411415
Version.fromString(objectPath.evaluate("nodes." + id + ".version")),
412416
HttpHost.create(objectPath.evaluate("nodes." + id + ".http.publish_address"))));
413417
}
414-
response = client().performRequest(new Request("GET", "_cluster/state"));
418+
response = client.performRequest(new Request("GET", "_cluster/state"));
415419
nodes.setMasterNodeId(ObjectPath.createFromResponse(response).evaluate("master_node"));
416420
return nodes;
417421
}
418422

419-
final class Nodes extends HashMap<String, Node> {
423+
static final class Nodes extends HashMap<String, Node> {
420424

421425
private String masterNodeId = null;
422426

@@ -469,7 +473,7 @@ public String toString() {
469473
}
470474
}
471475

472-
final class Node {
476+
static final class Node {
473477
private final String id;
474478
private final String nodeName;
475479
private final Version version;
Lines changed: 152 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,152 @@
1+
/*
2+
* Licensed to Elasticsearch under one or more contributor
3+
* license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright
5+
* ownership. Elasticsearch licenses this file to you under
6+
* the Apache License, Version 2.0 (the "License"); you may
7+
* not use this file except in compliance with the License.
8+
* You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
package org.elasticsearch.backwards;
20+
21+
import org.apache.http.HttpHost;
22+
import org.elasticsearch.Version;
23+
import org.elasticsearch.backwards.IndexingIT.Node;
24+
import org.elasticsearch.backwards.IndexingIT.Nodes;
25+
import org.elasticsearch.client.Request;
26+
import org.elasticsearch.client.Response;
27+
import org.elasticsearch.client.ResponseException;
28+
import org.elasticsearch.client.RestClient;
29+
import org.elasticsearch.cluster.metadata.IndexMetadata;
30+
import org.elasticsearch.common.CheckedRunnable;
31+
import org.elasticsearch.common.settings.Settings;
32+
import org.elasticsearch.rest.RestStatus;
33+
import org.elasticsearch.test.rest.ESRestTestCase;
34+
import org.elasticsearch.test.rest.yaml.ObjectPath;
35+
import org.junit.Before;
36+
37+
import java.io.IOException;
38+
import java.util.ArrayList;
39+
import java.util.List;
40+
import java.util.Map;
41+
42+
import static org.hamcrest.Matchers.containsString;
43+
import static org.hamcrest.Matchers.equalTo;
44+
45+
public class SearchWithMinCompatibleSearchNodeIT extends ESRestTestCase {
46+
47+
private static String index = "test_min_version";
48+
private static int numShards;
49+
private static int numReplicas = 1;
50+
private static int numDocs;
51+
private static Nodes nodes;
52+
private static List<Node> allNodes;
53+
private static Version bwcVersion;
54+
private static Version newVersion;
55+
56+
@Before
57+
public void prepareTestData() throws IOException {
58+
nodes = IndexingIT.buildNodeAndVersions(client());
59+
numShards = nodes.size();
60+
numDocs = randomIntBetween(numShards, 16);
61+
allNodes = new ArrayList<>();
62+
allNodes.addAll(nodes.getBWCNodes());
63+
allNodes.addAll(nodes.getNewNodes());
64+
bwcVersion = nodes.getBWCNodes().get(0).getVersion();
65+
newVersion = nodes.getNewNodes().get(0).getVersion();
66+
67+
if (client().performRequest(new Request("HEAD", "/" + index)).getStatusLine().getStatusCode() == 404) {
68+
createIndex(index, Settings.builder()
69+
.put(IndexMetadata.INDEX_NUMBER_OF_SHARDS_SETTING.getKey(), numShards)
70+
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, numReplicas).build());
71+
for (int i = 0; i < numDocs; i++) {
72+
Request request = new Request("PUT", index + "/_doc/" + i);
73+
request.setJsonEntity("{\"test\": \"test_" + randomAlphaOfLength(2) + "\"}");
74+
assertOK(client().performRequest(request));
75+
}
76+
ensureGreen(index);
77+
}
78+
}
79+
80+
public void testMinVersionAsNewVersion() throws Exception {
81+
try (RestClient client = buildClient(restClientSettings(),
82+
allNodes.stream().map(Node::getPublishAddress).toArray(HttpHost[]::new))) {
83+
Request newVersionRequest = new Request("POST",
84+
index + "/_search?min_compatible_shard_node=" + newVersion + "&ccs_minimize_roundtrips=false");
85+
assertBusy(() -> {
86+
ResponseException responseException = expectThrows(ResponseException.class, () -> client.performRequest(newVersionRequest));
87+
assertThat(responseException.getResponse().getStatusLine().getStatusCode(),
88+
equalTo(RestStatus.INTERNAL_SERVER_ERROR.getStatus()));
89+
assertThat(responseException.getMessage(),
90+
containsString("{\"error\":{\"root_cause\":[],\"type\":\"search_phase_execution_exception\""));
91+
assertThat(responseException.getMessage(), containsString("caused_by\":{\"type\":\"version_mismatch_exception\","
92+
+ "\"reason\":\"One of the shards is incompatible with the required minimum version [" + newVersion + "]\""));
93+
});
94+
}
95+
}
96+
97+
public void testMinVersionAsOldVersion() throws Exception {
98+
try (RestClient client = buildClient(restClientSettings(),
99+
allNodes.stream().map(Node::getPublishAddress).toArray(HttpHost[]::new))) {
100+
Request oldVersionRequest = new Request("POST", index + "/_search?min_compatible_shard_node=" + bwcVersion +
101+
"&ccs_minimize_roundtrips=false");
102+
oldVersionRequest.setJsonEntity("{\"query\":{\"match_all\":{}},\"_source\":false}");
103+
assertBusy(() -> {
104+
assertWithBwcVersionCheck(() -> {
105+
Response response = client.performRequest(oldVersionRequest);
106+
ObjectPath responseObject = ObjectPath.createFromResponse(response);
107+
Map<String, Object> shardsResult = responseObject.evaluate("_shards");
108+
assertThat(shardsResult.get("total"), equalTo(numShards));
109+
assertThat(shardsResult.get("successful"), equalTo(numShards));
110+
assertThat(shardsResult.get("failed"), equalTo(0));
111+
Map<String, Object> hitsResult = responseObject.evaluate("hits.total");
112+
assertThat(hitsResult.get("value"), equalTo(numDocs));
113+
assertThat(hitsResult.get("relation"), equalTo("eq"));
114+
}, client, oldVersionRequest);
115+
});
116+
}
117+
}
118+
119+
public void testCcsMinimizeRoundtripsIsFalse() throws Exception {
120+
try (RestClient client = buildClient(restClientSettings(),
121+
allNodes.stream().map(Node::getPublishAddress).toArray(HttpHost[]::new))) {
122+
Version version = randomBoolean() ? newVersion : bwcVersion;
123+
boolean shouldSetCcsMinimizeRoundtrips = randomBoolean();
124+
125+
Request request = new Request("POST", index + "/_search?min_compatible_shard_node=" + version +
126+
(shouldSetCcsMinimizeRoundtrips ? "&ccs_minimize_roundtrips=true" : ""));
127+
assertBusy(() -> {
128+
assertWithBwcVersionCheck(() -> {
129+
ResponseException responseException = expectThrows(ResponseException.class, () -> client.performRequest(request));
130+
assertThat(responseException.getResponse().getStatusLine().getStatusCode(),
131+
equalTo(RestStatus.BAD_REQUEST.getStatus()));
132+
assertThat(responseException.getMessage(),
133+
containsString("{\"error\":{\"root_cause\":[{\"type\":\"action_request_validation_exception\""));
134+
assertThat(responseException.getMessage(), containsString("\"reason\":\"Validation Failed: 1: "
135+
+ "[ccs_minimize_roundtrips] cannot be [true] when setting a minimum compatible shard version;\""));
136+
}, client, request);
137+
});
138+
}
139+
}
140+
141+
private void assertWithBwcVersionCheck(CheckedRunnable<Exception> code, RestClient client, Request request) throws Exception {
142+
if (bwcVersion.before(Version.V_8_0_0)) {
143+
// min_compatible_shard_node support doesn't exist in older versions and there will be an "unrecognized parameter" exception
144+
ResponseException exception = expectThrows(ResponseException.class, () -> client.performRequest(request));
145+
assertThat(exception.getResponse().getStatusLine().getStatusCode(),
146+
equalTo(RestStatus.BAD_REQUEST.getStatus()));
147+
assertThat(exception.getMessage(), containsString("contains unrecognized parameter: [min_compatible_shard_node]"));
148+
} else {
149+
code.run();
150+
}
151+
}
152+
}

rest-api-spec/src/main/resources/rest-api-spec/api/search.json

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -230,6 +230,10 @@
230230
"type":"boolean",
231231
"description":"Indicates whether hits.total should be rendered as an integer or an object in the rest search response",
232232
"default":false
233+
},
234+
"min_compatible_shard_node":{
235+
"type":"string",
236+
"description":"The minimum compatible version that all shards involved in search should have for this request to be successful"
233237
}
234238
},
235239
"body":{

server/src/main/java/org/elasticsearch/ElasticsearchException.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1047,7 +1047,12 @@ private enum ElasticsearchExceptionHandle {
10471047
org.elasticsearch.transport.NoSeedNodeLeftException.class,
10481048
org.elasticsearch.transport.NoSeedNodeLeftException::new,
10491049
160,
1050-
Version.V_7_10_0);
1050+
Version.V_7_10_0),
1051+
VERSION_MISMATCH_EXCEPTION(
1052+
org.elasticsearch.action.search.VersionMismatchException.class,
1053+
org.elasticsearch.action.search.VersionMismatchException::new,
1054+
161,
1055+
Version.V_8_0_0);
10511056

10521057
final Class<? extends ElasticsearchException> exceptionClass;
10531058
final CheckedFunction<StreamInput, ? extends ElasticsearchException, IOException> constructor;

server/src/main/java/org/elasticsearch/action/search/AbstractSearchAsyncAction.java

Lines changed: 29 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -222,7 +222,13 @@ public final void run() {
222222
throw new SearchPhaseExecutionException(getName(), msg, null, ShardSearchFailure.EMPTY_ARRAY);
223223
}
224224
}
225-
225+
Version version = request.minCompatibleShardNode();
226+
if (version != null && Version.CURRENT.minimumCompatibilityVersion().equals(version) == false) {
227+
if (checkMinimumVersion(shardsIts) == false) {
228+
throw new VersionMismatchException("One of the shards is incompatible with the required minimum version [{}]",
229+
request.minCompatibleShardNode());
230+
}
231+
}
226232
for (int i = 0; i < shardsIts.size(); i++) {
227233
final SearchShardIterator shardRoutings = shardsIts.get(i);
228234
assert shardRoutings.skip() == false;
@@ -240,6 +246,22 @@ void skipShard(SearchShardIterator iterator) {
240246
successfulShardExecution(iterator);
241247
}
242248

249+
250+
private boolean checkMinimumVersion(GroupShardsIterator<SearchShardIterator> shardsIts) {
251+
for (SearchShardIterator it : shardsIts) {
252+
if (it.getTargetNodeIds().isEmpty() == false) {
253+
boolean isCompatible = it.getTargetNodeIds().stream().anyMatch(nodeId -> {
254+
Transport.Connection conn = getConnection(it.getClusterAlias(), nodeId);
255+
return conn == null ? true : conn.getVersion().onOrAfter(request.minCompatibleShardNode());
256+
});
257+
if (isCompatible == false) {
258+
return false;
259+
}
260+
}
261+
}
262+
return true;
263+
}
264+
243265
protected void performPhaseOnShard(final int shardIndex, final SearchShardIterator shardIt, final SearchShardTarget shard) {
244266
/*
245267
* We capture the thread that this phase is starting on. When we are called back after executing the phase, we are either on the
@@ -660,7 +682,12 @@ final void onPhaseDone() { // as a tribute to @kimchy aka. finishHim()
660682

661683
@Override
662684
public final Transport.Connection getConnection(String clusterAlias, String nodeId) {
663-
return nodeIdToConnection.apply(clusterAlias, nodeId);
685+
Transport.Connection conn = nodeIdToConnection.apply(clusterAlias, nodeId);
686+
Version minVersion = request.minCompatibleShardNode();
687+
if (minVersion != null && conn != null && conn.getVersion().before(minVersion)) {
688+
throw new VersionMismatchException("One of the shards is incompatible with the required minimum version [{}]", minVersion);
689+
}
690+
return conn;
664691
}
665692

666693
@Override

server/src/main/java/org/elasticsearch/action/search/SearchRequest.java

Lines changed: 41 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -95,17 +95,26 @@ public class SearchRequest extends ActionRequest implements IndicesRequest.Repla
9595

9696
private Integer preFilterShardSize;
9797

98-
private boolean ccsMinimizeRoundtrips = true;
98+
private Boolean ccsMinimizeRoundtrips;
99+
100+
@Nullable
101+
private Version minCompatibleShardNode;
99102

100103
public static final IndicesOptions DEFAULT_INDICES_OPTIONS =
101104
IndicesOptions.strictExpandOpenAndForbidClosedIgnoreThrottled();
102105

103106
private IndicesOptions indicesOptions = DEFAULT_INDICES_OPTIONS;
104107

105108
public SearchRequest() {
109+
this((Version) null);
110+
}
111+
112+
public SearchRequest(Version minCompatibleShardNode) {
106113
this.localClusterAlias = null;
107114
this.absoluteStartMillis = DEFAULT_ABSOLUTE_START_MILLIS;
108115
this.finalReduce = true;
116+
this.minCompatibleShardNode = minCompatibleShardNode;
117+
this.ccsMinimizeRoundtrips = minCompatibleShardNode == null;
109118
}
110119

111120
/**
@@ -182,6 +191,7 @@ private SearchRequest(SearchRequest searchRequest, String[] indices, String loca
182191
this.localClusterAlias = localClusterAlias;
183192
this.absoluteStartMillis = absoluteStartMillis;
184193
this.finalReduce = finalReduce;
194+
this.minCompatibleShardNode = searchRequest.minCompatibleShardNode;
185195
}
186196

187197
/**
@@ -221,6 +231,11 @@ public SearchRequest(StreamInput in) throws IOException {
221231
finalReduce = true;
222232
}
223233
ccsMinimizeRoundtrips = in.readBoolean();
234+
if (in.getVersion().onOrAfter(Version.V_8_0_0)) {
235+
if (in.readBoolean()) {
236+
minCompatibleShardNode = Version.readVersion(in);
237+
}
238+
}
224239
}
225240

226241
@Override
@@ -248,7 +263,12 @@ public void writeTo(StreamOutput out) throws IOException {
248263
out.writeBoolean(finalReduce);
249264
}
250265
out.writeBoolean(ccsMinimizeRoundtrips);
251-
266+
if (out.getVersion().onOrAfter(Version.V_8_0_0)) {
267+
out.writeBoolean(minCompatibleShardNode != null);
268+
if (minCompatibleShardNode != null) {
269+
Version.writeVersion(minCompatibleShardNode, out);
270+
}
271+
}
252272
}
253273

254274
@Override
@@ -296,6 +316,12 @@ public ActionRequestValidationException validate() {
296316
}
297317
}
298318
}
319+
if (minCompatibleShardNode() != null) {
320+
if (isCcsMinimizeRoundtrips()) {
321+
validationException = addValidationError("[ccs_minimize_roundtrips] cannot be [true] when setting a minimum compatible "
322+
+ "shard version", validationException);
323+
}
324+
}
299325
return validationException;
300326
}
301327

@@ -334,6 +360,15 @@ long getAbsoluteStartMillis() {
334360
return absoluteStartMillis;
335361
}
336362

363+
/**
364+
* Returns the minimum compatible shard version the search request needs to run on. If the version is null, then there are no
365+
* restrictions imposed on shards versions part of this search.
366+
*/
367+
@Nullable
368+
public Version minCompatibleShardNode() {
369+
return minCompatibleShardNode;
370+
}
371+
337372
/**
338373
* Sets the indices the search will be executed on.
339374
*/
@@ -368,7 +403,7 @@ public boolean includeDataStreams() {
368403

369404
/**
370405
* Returns whether network round-trips should be minimized when executing cross-cluster search requests.
371-
* Defaults to <code>true</code>.
406+
* Defaults to <code>true</code>, unless <code>minCompatibleShardNode</code> is set in which case it's set to <code>false</code>.
372407
*/
373408
public boolean isCcsMinimizeRoundtrips() {
374409
return ccsMinimizeRoundtrips;
@@ -671,14 +706,15 @@ public boolean equals(Object o) {
671706
Objects.equals(allowPartialSearchResults, that.allowPartialSearchResults) &&
672707
Objects.equals(localClusterAlias, that.localClusterAlias) &&
673708
absoluteStartMillis == that.absoluteStartMillis &&
674-
ccsMinimizeRoundtrips == that.ccsMinimizeRoundtrips;
709+
ccsMinimizeRoundtrips == that.ccsMinimizeRoundtrips &&
710+
Objects.equals(minCompatibleShardNode, that.minCompatibleShardNode);
675711
}
676712

677713
@Override
678714
public int hashCode() {
679715
return Objects.hash(searchType, Arrays.hashCode(indices), routing, preference, source, requestCache,
680716
scroll, indicesOptions, batchedReduceSize, maxConcurrentShardRequests, preFilterShardSize,
681-
allowPartialSearchResults, localClusterAlias, absoluteStartMillis, ccsMinimizeRoundtrips);
717+
allowPartialSearchResults, localClusterAlias, absoluteStartMillis, ccsMinimizeRoundtrips, minCompatibleShardNode);
682718
}
683719

684720
@Override

0 commit comments

Comments
 (0)