Skip to content

Commit a3c8cac

Browse files
CR: add test for bulk rejection behaviour
1 parent 0b3b4ab commit a3c8cac

2 files changed

Lines changed: 77 additions & 1 deletion

File tree

server/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -171,6 +171,7 @@ protected void doRun() {
171171

172172
@Override
173173
public void onRejection(Exception e) {
174+
// Fail all operations after a bulk rejection hit an action that waited for a mapping update and finish the request
174175
while (context.hasMoreOperationsToExecute()) {
175176
context.setRequestToExecute(context.getCurrent());
176177
final long version = context.getRequestToExecute().version();
@@ -179,7 +180,6 @@ public void onRejection(Exception e) {
179180
: primary.getFailedIndexResult(e, version);
180181
onComplete(result, context, null);
181182
}
182-
// We're done, there's no more operations to execute so we resolve the wrapped listener
183183
finishRequest();
184184
}
185185

Lines changed: 76 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,76 @@
1+
/*
2+
* Licensed to Elasticsearch under one or more contributor
3+
* license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright
5+
* ownership. Elasticsearch licenses this file to you under
6+
* the Apache License, Version 2.0 (the "License"); you may
7+
* not use this file except in compliance with the License.
8+
* You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
package org.elasticsearch.action.bulk;
20+
21+
import org.elasticsearch.action.ActionFuture;
22+
import org.elasticsearch.action.index.IndexRequest;
23+
import org.elasticsearch.action.support.WriteRequest;
24+
import org.elasticsearch.client.Client;
25+
import org.elasticsearch.common.settings.Settings;
26+
import org.elasticsearch.test.ESIntegTestCase;
27+
28+
import java.util.Collections;
29+
30+
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
31+
32+
@ESIntegTestCase.ClusterScope(scope = ESIntegTestCase.Scope.TEST, numDataNodes = 2)
33+
public class BulkRejectionIT extends ESIntegTestCase {
34+
35+
@Override
36+
protected Settings nodeSettings(int nodeOrdinal) {
37+
return Settings.builder()
38+
.put(super.nodeSettings(nodeOrdinal))
39+
.put("thread_pool.write.size", 1)
40+
.put("thread_pool.write.queue_size", 1)
41+
.build();
42+
}
43+
44+
@Override
45+
protected int numberOfReplicas() {
46+
return 1;
47+
}
48+
49+
protected int numberOfShards() {
50+
return 5;
51+
}
52+
53+
public void testBulkRejectionAfterDynamicMappingUpdate() throws Exception {
54+
final String index = "test";
55+
assertAcked(prepareCreate(index));
56+
ensureGreen();
57+
final BulkRequest request1 = new BulkRequest();
58+
for (int i = 0; i < 500; ++i) {
59+
request1.add(new IndexRequest(index).source(Collections.singletonMap("key" + i, "value" + i)))
60+
.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
61+
}
62+
// Huge request to keep the write pool busy so that requests waiting on a mapping update in the other bulk request get rejected
63+
// by the write pool
64+
final BulkRequest request2 = new BulkRequest();
65+
for (int i = 0; i < 10_000; ++i) {
66+
request2.add(new IndexRequest(index).source(Collections.singletonMap("key", "valuea" + i)))
67+
.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
68+
}
69+
final Client client = client();
70+
final ActionFuture<BulkResponse> bulkFuture1 = client.bulk(request1);
71+
final ActionFuture<BulkResponse> bulkFuture2 = client.bulk(request2);
72+
bulkFuture1.actionGet();
73+
bulkFuture2.actionGet();
74+
internalCluster().assertSeqNos();
75+
}
76+
}

0 commit comments

Comments
 (0)