Skip to content

Commit 95b42a7

Browse files
committed
Ensure incremental bulk setting is set atomically (#112479)
Currently the rest.incremental_bulk is read in two different places. This means that it will be employed in two steps introducing unpredictable behavior. This commit ensures that it is only read in a single place.
1 parent a03fb12 commit 95b42a7

7 files changed

Lines changed: 112 additions & 92 deletions

File tree

modules/transport-netty4/src/internalClusterTest/java/org/elasticsearch/http/netty4/Netty4HttpRequestSizeLimitIT.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,14 +14,14 @@
1414
import io.netty.util.ReferenceCounted;
1515

1616
import org.elasticsearch.ESNetty4IntegTestCase;
17+
import org.elasticsearch.action.bulk.IncrementalBulkService;
1718
import org.elasticsearch.common.settings.Settings;
1819
import org.elasticsearch.common.transport.TransportAddress;
1920
import org.elasticsearch.common.unit.ByteSizeUnit;
2021
import org.elasticsearch.common.unit.ByteSizeValue;
2122
import org.elasticsearch.core.Tuple;
2223
import org.elasticsearch.http.HttpServerTransport;
2324
import org.elasticsearch.indices.breaker.HierarchyCircuitBreakerService;
24-
import org.elasticsearch.rest.action.document.RestBulkAction;
2525
import org.elasticsearch.test.ESIntegTestCase.ClusterScope;
2626
import org.elasticsearch.test.ESIntegTestCase.Scope;
2727

@@ -54,7 +54,7 @@ protected Settings nodeSettings(int nodeOrdinal, Settings otherSettings) {
5454
return Settings.builder()
5555
.put(super.nodeSettings(nodeOrdinal, otherSettings))
5656
// TODO: We do not currently support in flight circuit breaker limits for bulk. However, IndexingPressure applies
57-
.put(RestBulkAction.INCREMENTAL_BULK.getKey(), false)
57+
.put(IncrementalBulkService.INCREMENTAL_BULK.getKey(), false)
5858
.put(HierarchyCircuitBreakerService.IN_FLIGHT_REQUESTS_CIRCUIT_BREAKER_LIMIT_SETTING.getKey(), LIMIT)
5959
.build();
6060
}

qa/smoke-test-http/src/javaRestTest/java/org/elasticsearch/http/IncrementalBulkRestIT.java

Lines changed: 76 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -8,9 +8,11 @@
88

99
package org.elasticsearch.http;
1010

11+
import org.elasticsearch.action.bulk.IncrementalBulkService;
1112
import org.elasticsearch.client.Request;
1213
import org.elasticsearch.client.Response;
1314
import org.elasticsearch.client.ResponseException;
15+
import org.elasticsearch.common.settings.Settings;
1416
import org.elasticsearch.common.xcontent.XContentHelper;
1517
import org.elasticsearch.test.ESIntegTestCase;
1618
import org.elasticsearch.xcontent.json.JsonXContent;
@@ -25,7 +27,6 @@
2527
@ESIntegTestCase.ClusterScope(scope = ESIntegTestCase.Scope.SUITE, supportsDedicatedMasters = false, numDataNodes = 2, numClientNodes = 0)
2628
public class IncrementalBulkRestIT extends HttpSmokeTestCase {
2729

28-
@SuppressWarnings("unchecked")
2930
public void testIncrementalBulk() throws IOException {
3031
Request createRequest = new Request("PUT", "/index_name");
3132
createRequest.setJsonEntity("""
@@ -55,35 +56,52 @@ public void testIncrementalBulk() throws IOException {
5556
final Response indexSuccessFul = getRestClient().performRequest(firstBulkRequest);
5657
assertThat(indexSuccessFul.getStatusLine().getStatusCode(), equalTo(OK.getStatus()));
5758

58-
Request bulkRequest = new Request("POST", "/index_name/_bulk");
59+
sendLargeBulk();
60+
}
61+
62+
public void testBulkWithIncrementalDisabled() throws IOException {
63+
Request createRequest = new Request("PUT", "/index_name");
64+
createRequest.setJsonEntity("""
65+
{
66+
"settings": {
67+
"index": {
68+
"number_of_shards": 1,
69+
"number_of_replicas": 1,
70+
"write.wait_for_active_shards": 2
71+
}
72+
}
73+
}""");
74+
final Response indexCreatedResponse = getRestClient().performRequest(createRequest);
75+
assertThat(indexCreatedResponse.getStatusLine().getStatusCode(), equalTo(OK.getStatus()));
76+
77+
Request firstBulkRequest = new Request("POST", "/index_name/_bulk");
5978

6079
// index documents for the rollup job
61-
final StringBuilder bulk = new StringBuilder();
62-
bulk.append("{\"delete\":{\"_index\":\"index_name\",\"_id\":\"1\"}}\n");
63-
int updates = 0;
64-
for (int i = 0; i < 1000; i++) {
65-
bulk.append("{\"index\":{\"_index\":\"index_name\"}}\n");
66-
bulk.append("{\"field\":").append(i).append("}\n");
67-
if (randomBoolean() && randomBoolean() && randomBoolean() && randomBoolean()) {
68-
++updates;
69-
bulk.append("{\"update\":{\"_index\":\"index_name\",\"_id\":\"2\"}}\n");
70-
bulk.append("{\"doc\":{\"field\":").append(i).append("}}\n");
71-
}
72-
}
73-
bulk.append("\r\n");
80+
String bulkBody = "{\"index\":{\"_index\":\"index_name\",\"_id\":\"1\"}}\n"
81+
+ "{\"field\":1}\n"
82+
+ "{\"index\":{\"_index\":\"index_name\",\"_id\":\"2\"}}\n"
83+
+ "{\"field\":1}\n"
84+
+ "\r\n";
7485

75-
bulkRequest.setJsonEntity(bulk.toString());
86+
firstBulkRequest.setJsonEntity(bulkBody);
7687

77-
final Response bulkResponse = getRestClient().performRequest(bulkRequest);
78-
assertThat(bulkResponse.getStatusLine().getStatusCode(), equalTo(OK.getStatus()));
79-
Map<String, Object> responseMap = XContentHelper.convertToMap(
80-
JsonXContent.jsonXContent,
81-
bulkResponse.getEntity().getContent(),
82-
true
83-
);
88+
final Response indexSuccessFul = getRestClient().performRequest(firstBulkRequest);
89+
assertThat(indexSuccessFul.getStatusLine().getStatusCode(), equalTo(OK.getStatus()));
8490

85-
assertFalse((Boolean) responseMap.get("errors"));
86-
assertThat(((List<Object>) responseMap.get("items")).size(), equalTo(1001 + updates));
91+
clusterAdmin().prepareUpdateSettings()
92+
.setPersistentSettings(Settings.builder().put(IncrementalBulkService.INCREMENTAL_BULK.getKey(), false).build())
93+
.get();
94+
95+
internalCluster().getInstances(IncrementalBulkService.class).forEach(i -> i.setForTests(false));
96+
97+
try {
98+
sendLargeBulk();
99+
} finally {
100+
internalCluster().getInstances(IncrementalBulkService.class).forEach(i -> i.setForTests(true));
101+
clusterAdmin().prepareUpdateSettings()
102+
.setPersistentSettings(Settings.builder().put(IncrementalBulkService.INCREMENTAL_BULK.getKey(), (String) null).build())
103+
.get();
104+
}
87105
}
88106

89107
public void testIncrementalMalformed() throws IOException {
@@ -114,4 +132,37 @@ public void testIncrementalMalformed() throws IOException {
114132

115133
expectThrows(ResponseException.class, () -> getRestClient().performRequest(bulkRequest));
116134
}
135+
136+
@SuppressWarnings("unchecked")
137+
private static void sendLargeBulk() throws IOException {
138+
Request bulkRequest = new Request("POST", "/index_name/_bulk");
139+
140+
// index documents for the rollup job
141+
final StringBuilder bulk = new StringBuilder();
142+
bulk.append("{\"delete\":{\"_index\":\"index_name\",\"_id\":\"1\"}}\n");
143+
int updates = 0;
144+
for (int i = 0; i < 1000; i++) {
145+
bulk.append("{\"index\":{\"_index\":\"index_name\"}}\n");
146+
bulk.append("{\"field\":").append(i).append("}\n");
147+
if (randomBoolean() && randomBoolean() && randomBoolean() && randomBoolean()) {
148+
++updates;
149+
bulk.append("{\"update\":{\"_index\":\"index_name\",\"_id\":\"2\"}}\n");
150+
bulk.append("{\"doc\":{\"field\":").append(i).append("}}\n");
151+
}
152+
}
153+
bulk.append("\r\n");
154+
155+
bulkRequest.setJsonEntity(bulk.toString());
156+
157+
final Response bulkResponse = getRestClient().performRequest(bulkRequest);
158+
assertThat(bulkResponse.getStatusLine().getStatusCode(), equalTo(OK.getStatus()));
159+
Map<String, Object> responseMap = XContentHelper.convertToMap(
160+
JsonXContent.jsonXContent,
161+
bulkResponse.getEntity().getContent(),
162+
true
163+
);
164+
165+
assertFalse((Boolean) responseMap.get("errors"));
166+
assertThat(((List<Object>) responseMap.get("items")).size(), equalTo(1001 + updates));
167+
}
117168
}

server/src/main/java/org/elasticsearch/action/bulk/IncrementalBulkService.java

Lines changed: 25 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -15,74 +15,72 @@
1515
import org.elasticsearch.action.support.ActiveShardCount;
1616
import org.elasticsearch.client.internal.Client;
1717
import org.elasticsearch.common.settings.ClusterSettings;
18+
import org.elasticsearch.common.settings.Setting;
1819
import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
1920
import org.elasticsearch.common.util.concurrent.ThreadContext;
2021
import org.elasticsearch.core.Nullable;
2122
import org.elasticsearch.core.Releasable;
2223
import org.elasticsearch.core.Releasables;
2324
import org.elasticsearch.core.TimeValue;
2425
import org.elasticsearch.index.IndexingPressure;
25-
import org.elasticsearch.rest.action.document.RestBulkAction;
2626

2727
import java.util.ArrayList;
2828
import java.util.Collections;
2929
import java.util.List;
3030
import java.util.concurrent.atomic.AtomicBoolean;
3131
import java.util.function.Supplier;
3232

33+
import static org.elasticsearch.common.settings.Setting.boolSetting;
34+
3335
public class IncrementalBulkService {
3436

37+
public static final Setting<Boolean> INCREMENTAL_BULK = boolSetting(
38+
"rest.incremental_bulk",
39+
true,
40+
Setting.Property.NodeScope,
41+
Setting.Property.Dynamic
42+
);
3543
private final Client client;
44+
private final AtomicBoolean enabledForTests = new AtomicBoolean(true);
3645
private final IndexingPressure indexingPressure;
3746
private final ThreadContext threadContext;
38-
private final Supplier<Boolean> enabled;
3947

4048
public IncrementalBulkService(Client client, IndexingPressure indexingPressure, ThreadContext threadContext) {
41-
this(client, indexingPressure, threadContext, new Enabled());
42-
}
43-
44-
public IncrementalBulkService(
45-
Client client,
46-
IndexingPressure indexingPressure,
47-
ThreadContext threadContext,
48-
ClusterSettings clusterSettings
49-
) {
50-
this(client, indexingPressure, threadContext, new Enabled(clusterSettings));
51-
}
52-
53-
public IncrementalBulkService(
54-
Client client,
55-
IndexingPressure indexingPressure,
56-
ThreadContext threadContext,
57-
Supplier<Boolean> enabled
58-
) {
5949
this.client = client;
6050
this.indexingPressure = indexingPressure;
6151
this.threadContext = threadContext;
62-
this.enabled = enabled;
63-
}
64-
65-
public boolean incrementalBulkEnabled() {
66-
return enabled.get();
6752
}
6853

6954
public Handler newBulkRequest() {
55+
ensureEnabled();
7056
return newBulkRequest(null, null, null);
7157
}
7258

7359
public Handler newBulkRequest(@Nullable String waitForActiveShards, @Nullable TimeValue timeout, @Nullable String refresh) {
60+
ensureEnabled();
7461
return new Handler(client, threadContext, indexingPressure, waitForActiveShards, timeout, refresh);
7562
}
7663

64+
private void ensureEnabled() {
65+
if (enabledForTests.get() == false) {
66+
throw new AssertionError("Unexpected incremental bulk request");
67+
}
68+
}
69+
70+
// This method only exists to tests that the feature flag works. Remove once we no longer need the flag.
71+
public void setForTests(boolean value) {
72+
enabledForTests.set(value);
73+
}
74+
7775
public static class Enabled implements Supplier<Boolean> {
7876

7977
private final AtomicBoolean incrementalBulksEnabled = new AtomicBoolean(true);
8078

8179
public Enabled() {}
8280

8381
public Enabled(ClusterSettings clusterSettings) {
84-
incrementalBulksEnabled.set(clusterSettings.get(RestBulkAction.INCREMENTAL_BULK));
85-
clusterSettings.addSettingsUpdateConsumer(RestBulkAction.INCREMENTAL_BULK, incrementalBulksEnabled::set);
82+
incrementalBulksEnabled.set(clusterSettings.get(INCREMENTAL_BULK));
83+
clusterSettings.addSettingsUpdateConsumer(INCREMENTAL_BULK, incrementalBulksEnabled::set);
8684
}
8785

8886
@Override

server/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
import org.apache.logging.log4j.LogManager;
1212
import org.elasticsearch.action.admin.cluster.configuration.TransportAddVotingConfigExclusionsAction;
1313
import org.elasticsearch.action.admin.indices.close.TransportCloseIndexAction;
14+
import org.elasticsearch.action.bulk.IncrementalBulkService;
1415
import org.elasticsearch.action.bulk.WriteAckDelay;
1516
import org.elasticsearch.action.datastreams.autosharding.DataStreamAutoShardingService;
1617
import org.elasticsearch.action.ingest.SimulatePipelineTransportAction;
@@ -113,7 +114,6 @@
113114
import org.elasticsearch.readiness.ReadinessService;
114115
import org.elasticsearch.repositories.fs.FsRepository;
115116
import org.elasticsearch.rest.BaseRestHandler;
116-
import org.elasticsearch.rest.action.document.RestBulkAction;
117117
import org.elasticsearch.script.ScriptService;
118118
import org.elasticsearch.search.SearchModule;
119119
import org.elasticsearch.search.SearchService;
@@ -243,7 +243,7 @@ public void apply(Settings value, Settings current, Settings previous) {
243243
Metadata.SETTING_READ_ONLY_SETTING,
244244
Metadata.SETTING_READ_ONLY_ALLOW_DELETE_SETTING,
245245
ShardLimitValidator.SETTING_CLUSTER_MAX_SHARDS_PER_NODE,
246-
RestBulkAction.INCREMENTAL_BULK,
246+
IncrementalBulkService.INCREMENTAL_BULK,
247247
RecoverySettings.INDICES_RECOVERY_MAX_BYTES_PER_SEC_SETTING,
248248
RecoverySettings.INDICES_RECOVERY_RETRY_DELAY_STATE_SYNC_SETTING,
249249
RecoverySettings.INDICES_RECOVERY_RETRY_DELAY_NETWORK_SETTING,

server/src/main/java/org/elasticsearch/node/NodeConstruction.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -895,8 +895,7 @@ private void construct(
895895
final IncrementalBulkService incrementalBulkService = new IncrementalBulkService(
896896
client,
897897
indexingLimits,
898-
threadPool.getThreadContext(),
899-
clusterService.getClusterSettings()
898+
threadPool.getThreadContext()
900899
);
901900

902901
ActionModule actionModule = new ActionModule(

server/src/main/java/org/elasticsearch/rest/action/document/RestBulkAction.java

Lines changed: 1 addition & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@
1919
import org.elasticsearch.common.bytes.BytesReference;
2020
import org.elasticsearch.common.bytes.CompositeBytesReference;
2121
import org.elasticsearch.common.bytes.ReleasableBytesReference;
22-
import org.elasticsearch.common.settings.Setting;
2322
import org.elasticsearch.common.settings.Settings;
2423
import org.elasticsearch.core.Releasable;
2524
import org.elasticsearch.core.Releasables;
@@ -42,7 +41,6 @@
4241
import java.util.Map;
4342
import java.util.function.Supplier;
4443

45-
import static org.elasticsearch.common.settings.Setting.boolSetting;
4644
import static org.elasticsearch.rest.RestRequest.Method.POST;
4745
import static org.elasticsearch.rest.RestRequest.Method.PUT;
4846

@@ -59,12 +57,6 @@
5957
public class RestBulkAction extends BaseRestHandler {
6058

6159
public static final String TYPES_DEPRECATION_MESSAGE = "[types removal] Specifying types in bulk requests is deprecated.";
62-
public static final Setting<Boolean> INCREMENTAL_BULK = boolSetting(
63-
"rest.incremental_bulk",
64-
true,
65-
Setting.Property.NodeScope,
66-
Setting.Property.Dynamic
67-
);
6860

6961
private final boolean allowExplicitIndex;
7062
private final IncrementalBulkService bulkHandler;
@@ -93,7 +85,7 @@ public String getName() {
9385

9486
@Override
9587
public RestChannelConsumer prepareRequest(final RestRequest request, final NodeClient client) throws IOException {
96-
if (bulkHandler.incrementalBulkEnabled() == false) {
88+
if (request.isStreamedContent() == false) {
9789
if (request.getRestApiVersion() == RestApiVersion.V_7 && request.hasParam("type")) {
9890
request.param("type");
9991
}

server/src/test/java/org/elasticsearch/rest/action/document/RestBulkActionTests.java

Lines changed: 5 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -67,7 +67,7 @@ public void bulk(BulkRequest request, ActionListener<BulkResponse> listener) {
6767
params.put("pipeline", "timestamps");
6868
new RestBulkAction(
6969
settings(IndexVersion.current()).build(),
70-
new IncrementalBulkService(mock(Client.class), mock(IndexingPressure.class), new ThreadContext(Settings.EMPTY), () -> false)
70+
new IncrementalBulkService(mock(Client.class), mock(IndexingPressure.class), new ThreadContext(Settings.EMPTY))
7171
).handleRequest(
7272
new FakeRestRequest.Builder(xContentRegistry()).withPath("my_index/_bulk").withParams(params).withContent(new BytesArray("""
7373
{"index":{"_id":"1"}}
@@ -102,12 +102,7 @@ public void bulk(BulkRequest request, ActionListener<BulkResponse> listener) {
102102
{
103103
new RestBulkAction(
104104
settings(IndexVersion.current()).build(),
105-
new IncrementalBulkService(
106-
mock(Client.class),
107-
mock(IndexingPressure.class),
108-
new ThreadContext(Settings.EMPTY),
109-
() -> false
110-
)
105+
new IncrementalBulkService(mock(Client.class), mock(IndexingPressure.class), new ThreadContext(Settings.EMPTY))
111106
).handleRequest(
112107
new FakeRestRequest.Builder(xContentRegistry()).withPath("my_index/_bulk")
113108
.withParams(params)
@@ -131,12 +126,7 @@ public void bulk(BulkRequest request, ActionListener<BulkResponse> listener) {
131126
bulkCalled.set(false);
132127
new RestBulkAction(
133128
settings(IndexVersion.current()).build(),
134-
new IncrementalBulkService(
135-
mock(Client.class),
136-
mock(IndexingPressure.class),
137-
new ThreadContext(Settings.EMPTY),
138-
() -> false
139-
)
129+
new IncrementalBulkService(mock(Client.class), mock(IndexingPressure.class), new ThreadContext(Settings.EMPTY))
140130
).handleRequest(
141131
new FakeRestRequest.Builder(xContentRegistry()).withPath("my_index/_bulk")
142132
.withParams(params)
@@ -159,12 +149,7 @@ public void bulk(BulkRequest request, ActionListener<BulkResponse> listener) {
159149
bulkCalled.set(false);
160150
new RestBulkAction(
161151
settings(IndexVersion.current()).build(),
162-
new IncrementalBulkService(
163-
mock(Client.class),
164-
mock(IndexingPressure.class),
165-
new ThreadContext(Settings.EMPTY),
166-
() -> false
167-
)
152+
new IncrementalBulkService(mock(Client.class), mock(IndexingPressure.class), new ThreadContext(Settings.EMPTY))
168153
).handleRequest(
169154
new FakeRestRequest.Builder(xContentRegistry()).withPath("my_index/_bulk")
170155
.withParams(params)
@@ -188,12 +173,7 @@ public void bulk(BulkRequest request, ActionListener<BulkResponse> listener) {
188173
bulkCalled.set(false);
189174
new RestBulkAction(
190175
settings(IndexVersion.current()).build(),
191-
new IncrementalBulkService(
192-
mock(Client.class),
193-
mock(IndexingPressure.class),
194-
new ThreadContext(Settings.EMPTY),
195-
() -> false
196-
)
176+
new IncrementalBulkService(mock(Client.class), mock(IndexingPressure.class), new ThreadContext(Settings.EMPTY))
197177
).handleRequest(
198178
new FakeRestRequest.Builder(xContentRegistry()).withPath("my_index/_bulk")
199179
.withParams(params)

0 commit comments

Comments
 (0)