Skip to content

Commit a03fb12

Browse files
committed
Incremental bulk integration with rest layer (#112154)
Integrate the incremental bulks into RestBulkAction
1 parent c00768a commit a03fb12

17 files changed

Lines changed: 675 additions & 100 deletions

File tree

distribution/archives/integ-test-zip/src/javaRestTest/java/org/elasticsearch/test/rest/RequestsWithoutContentIT.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ public void testIndexMissingBody() throws IOException {
2626
assertResponseException(responseException, "request body is required");
2727
}
2828

29+
@AwaitsFix(bugUrl = "need to decide how to handle this scenario")
2930
public void testBulkMissingBody() throws IOException {
3031
ResponseException responseException = expectThrows(
3132
ResponseException.class,

modules/transport-netty4/src/internalClusterTest/java/org/elasticsearch/http/netty4/Netty4HttpRequestSizeLimitIT.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import org.elasticsearch.core.Tuple;
2222
import org.elasticsearch.http.HttpServerTransport;
2323
import org.elasticsearch.indices.breaker.HierarchyCircuitBreakerService;
24+
import org.elasticsearch.rest.action.document.RestBulkAction;
2425
import org.elasticsearch.test.ESIntegTestCase.ClusterScope;
2526
import org.elasticsearch.test.ESIntegTestCase.Scope;
2627

@@ -52,6 +53,8 @@ protected boolean addMockHttpTransport() {
5253
protected Settings nodeSettings(int nodeOrdinal, Settings otherSettings) {
5354
return Settings.builder()
5455
.put(super.nodeSettings(nodeOrdinal, otherSettings))
56+
// TODO: We do not currently support in flight circuit breaker limits for bulk. However, IndexingPressure applies
57+
.put(RestBulkAction.INCREMENTAL_BULK.getKey(), false)
5558
.put(HierarchyCircuitBreakerService.IN_FLIGHT_REQUESTS_CIRCUIT_BREAKER_LIMIT_SETTING.getKey(), LIMIT)
5659
.build();
5760
}

modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpAggregator.java

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -36,10 +36,6 @@ public class Netty4HttpAggregator extends HttpObjectAggregator {
3636
private boolean aggregating = true;
3737
private boolean ignoreContentAfterContinueResponse = false;
3838

39-
public Netty4HttpAggregator(int maxContentLength) {
40-
this(maxContentLength, IGNORE_TEST);
41-
}
42-
4339
public Netty4HttpAggregator(int maxContentLength, Predicate<HttpPreRequest> decider) {
4440
super(maxContentLength);
4541
this.decider = decider;
@@ -50,7 +46,7 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception
5046
assert msg instanceof HttpObject;
5147
if (msg instanceof HttpRequest request) {
5248
var preReq = HttpHeadersAuthenticatorUtils.asHttpPreRequest(request);
53-
aggregating = decider.test(preReq);
49+
aggregating = decider.test(preReq) && IGNORE_TEST.test(preReq);
5450
}
5551
if (aggregating || msg instanceof FullHttpRequest) {
5652
super.channelRead(ctx, msg);

modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpServerTransport.java

Lines changed: 15 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@
3737
import org.apache.logging.log4j.LogManager;
3838
import org.apache.logging.log4j.Logger;
3939
import org.elasticsearch.ExceptionsHelper;
40+
import org.elasticsearch.action.bulk.IncrementalBulkService;
4041
import org.elasticsearch.common.network.CloseableChannel;
4142
import org.elasticsearch.common.network.NetworkService;
4243
import org.elasticsearch.common.network.ThreadWatchdog;
@@ -97,6 +98,7 @@ public class Netty4HttpServerTransport extends AbstractHttpServerTransport {
9798
private final TLSConfig tlsConfig;
9899
private final AcceptChannelHandler.AcceptPredicate acceptChannelPredicate;
99100
private final HttpValidator httpValidator;
101+
private final IncrementalBulkService.Enabled enabled;
100102
private final ThreadWatchdog threadWatchdog;
101103
private final int readTimeoutMillis;
102104

@@ -135,6 +137,7 @@ public Netty4HttpServerTransport(
135137
this.acceptChannelPredicate = acceptChannelPredicate;
136138
this.httpValidator = httpValidator;
137139
this.threadWatchdog = networkService.getThreadWatchdog();
140+
this.enabled = new IncrementalBulkService.Enabled(clusterSettings);
138141

139142
this.pipeliningMaxEvents = SETTING_PIPELINING_MAX_EVENTS.get(settings);
140143

@@ -280,7 +283,7 @@ public void onException(HttpChannel channel, Exception cause) {
280283
}
281284

282285
public ChannelHandler configureServerChannelHandler() {
283-
return new HttpChannelHandler(this, handlingSettings, tlsConfig, acceptChannelPredicate, httpValidator);
286+
return new HttpChannelHandler(this, handlingSettings, tlsConfig, acceptChannelPredicate, httpValidator, enabled);
284287
}
285288

286289
static final AttributeKey<Netty4HttpChannel> HTTP_CHANNEL_KEY = AttributeKey.newInstance("es-http-channel");
@@ -293,19 +296,22 @@ protected static class HttpChannelHandler extends ChannelInitializer<Channel> {
293296
private final TLSConfig tlsConfig;
294297
private final BiPredicate<String, InetSocketAddress> acceptChannelPredicate;
295298
private final HttpValidator httpValidator;
299+
private final IncrementalBulkService.Enabled enabled;
296300

297301
protected HttpChannelHandler(
298302
final Netty4HttpServerTransport transport,
299303
final HttpHandlingSettings handlingSettings,
300304
final TLSConfig tlsConfig,
301305
@Nullable final BiPredicate<String, InetSocketAddress> acceptChannelPredicate,
302-
@Nullable final HttpValidator httpValidator
306+
@Nullable final HttpValidator httpValidator,
307+
IncrementalBulkService.Enabled enabled
303308
) {
304309
this.transport = transport;
305310
this.handlingSettings = handlingSettings;
306311
this.tlsConfig = tlsConfig;
307312
this.acceptChannelPredicate = acceptChannelPredicate;
308313
this.httpValidator = httpValidator;
314+
this.enabled = enabled;
309315
}
310316

311317
@Override
@@ -366,7 +372,13 @@ protected HttpMessage createMessage(String[] initialLine) throws Exception {
366372
);
367373
}
368374
// combines the HTTP message pieces into a single full HTTP request (with headers and body)
369-
final HttpObjectAggregator aggregator = new Netty4HttpAggregator(handlingSettings.maxContentLength());
375+
final HttpObjectAggregator aggregator = new Netty4HttpAggregator(
376+
handlingSettings.maxContentLength(),
377+
httpPreRequest -> enabled.get() == false
378+
|| (httpPreRequest.uri().contains("_bulk") == false
379+
|| httpPreRequest.uri().contains("_bulk_update")
380+
|| httpPreRequest.uri().contains("/_xpack/monitoring/_bulk"))
381+
);
370382
aggregator.setMaxCumulationBufferComponents(transport.maxCompositeBufferComponents);
371383
ch.pipeline()
372384
.addLast("decoder_compress", new HttpContentDecompressor()) // this handles request body decompression

modules/transport-netty4/src/test/java/org/elasticsearch/http/netty4/Netty4HttpServerTransportTests.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@
4646
import org.elasticsearch.ElasticsearchSecurityException;
4747
import org.elasticsearch.ElasticsearchWrapperException;
4848
import org.elasticsearch.action.ActionListener;
49+
import org.elasticsearch.action.bulk.IncrementalBulkService;
4950
import org.elasticsearch.action.support.ActionTestUtils;
5051
import org.elasticsearch.action.support.SubscribableListener;
5152
import org.elasticsearch.client.Request;
@@ -419,7 +420,8 @@ public ChannelHandler configureServerChannelHandler() {
419420
handlingSettings,
420421
TLSConfig.noTLS(),
421422
null,
422-
randomFrom((httpPreRequest, channel, listener) -> listener.onResponse(null), null)
423+
randomFrom((httpPreRequest, channel, listener) -> listener.onResponse(null), null),
424+
new IncrementalBulkService.Enabled(clusterSettings)
423425
) {
424426
@Override
425427
protected void initChannel(Channel ch) throws Exception {
Lines changed: 117 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,117 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the Elastic License
4+
* 2.0 and the Server Side Public License, v 1; you may not use this file except
5+
* in compliance with, at your election, the Elastic License 2.0 or the Server
6+
* Side Public License, v 1.
7+
*/
8+
9+
package org.elasticsearch.http;
10+
11+
import org.elasticsearch.client.Request;
12+
import org.elasticsearch.client.Response;
13+
import org.elasticsearch.client.ResponseException;
14+
import org.elasticsearch.common.xcontent.XContentHelper;
15+
import org.elasticsearch.test.ESIntegTestCase;
16+
import org.elasticsearch.xcontent.json.JsonXContent;
17+
18+
import java.io.IOException;
19+
import java.util.List;
20+
import java.util.Map;
21+
22+
import static org.elasticsearch.rest.RestStatus.OK;
23+
import static org.hamcrest.Matchers.equalTo;
24+
25+
@ESIntegTestCase.ClusterScope(scope = ESIntegTestCase.Scope.SUITE, supportsDedicatedMasters = false, numDataNodes = 2, numClientNodes = 0)
26+
public class IncrementalBulkRestIT extends HttpSmokeTestCase {
27+
28+
@SuppressWarnings("unchecked")
29+
public void testIncrementalBulk() throws IOException {
30+
Request createRequest = new Request("PUT", "/index_name");
31+
createRequest.setJsonEntity("""
32+
{
33+
"settings": {
34+
"index": {
35+
"number_of_shards": 1,
36+
"number_of_replicas": 1,
37+
"write.wait_for_active_shards": 2
38+
}
39+
}
40+
}""");
41+
final Response indexCreatedResponse = getRestClient().performRequest(createRequest);
42+
assertThat(indexCreatedResponse.getStatusLine().getStatusCode(), equalTo(OK.getStatus()));
43+
44+
Request firstBulkRequest = new Request("POST", "/index_name/_bulk");
45+
46+
// index documents for the rollup job
47+
String bulkBody = "{\"index\":{\"_index\":\"index_name\",\"_id\":\"1\"}}\n"
48+
+ "{\"field\":1}\n"
49+
+ "{\"index\":{\"_index\":\"index_name\",\"_id\":\"2\"}}\n"
50+
+ "{\"field\":1}\n"
51+
+ "\r\n";
52+
53+
firstBulkRequest.setJsonEntity(bulkBody);
54+
55+
final Response indexSuccessFul = getRestClient().performRequest(firstBulkRequest);
56+
assertThat(indexSuccessFul.getStatusLine().getStatusCode(), equalTo(OK.getStatus()));
57+
58+
Request bulkRequest = new Request("POST", "/index_name/_bulk");
59+
60+
// index documents for the rollup job
61+
final StringBuilder bulk = new StringBuilder();
62+
bulk.append("{\"delete\":{\"_index\":\"index_name\",\"_id\":\"1\"}}\n");
63+
int updates = 0;
64+
for (int i = 0; i < 1000; i++) {
65+
bulk.append("{\"index\":{\"_index\":\"index_name\"}}\n");
66+
bulk.append("{\"field\":").append(i).append("}\n");
67+
if (randomBoolean() && randomBoolean() && randomBoolean() && randomBoolean()) {
68+
++updates;
69+
bulk.append("{\"update\":{\"_index\":\"index_name\",\"_id\":\"2\"}}\n");
70+
bulk.append("{\"doc\":{\"field\":").append(i).append("}}\n");
71+
}
72+
}
73+
bulk.append("\r\n");
74+
75+
bulkRequest.setJsonEntity(bulk.toString());
76+
77+
final Response bulkResponse = getRestClient().performRequest(bulkRequest);
78+
assertThat(bulkResponse.getStatusLine().getStatusCode(), equalTo(OK.getStatus()));
79+
Map<String, Object> responseMap = XContentHelper.convertToMap(
80+
JsonXContent.jsonXContent,
81+
bulkResponse.getEntity().getContent(),
82+
true
83+
);
84+
85+
assertFalse((Boolean) responseMap.get("errors"));
86+
assertThat(((List<Object>) responseMap.get("items")).size(), equalTo(1001 + updates));
87+
}
88+
89+
public void testIncrementalMalformed() throws IOException {
90+
Request createRequest = new Request("PUT", "/index_name");
91+
createRequest.setJsonEntity("""
92+
{
93+
"settings": {
94+
"index": {
95+
"number_of_shards": 1,
96+
"number_of_replicas": 1,
97+
"write.wait_for_active_shards": 2
98+
}
99+
}
100+
}""");
101+
final Response indexCreatedResponse = getRestClient().performRequest(createRequest);
102+
assertThat(indexCreatedResponse.getStatusLine().getStatusCode(), equalTo(OK.getStatus()));
103+
104+
Request bulkRequest = new Request("POST", "/index_name/_bulk");
105+
106+
// index documents for the rollup job
107+
final StringBuilder bulk = new StringBuilder();
108+
bulk.append("{\"index\":{\"_index\":\"index_name\"}}\n");
109+
bulk.append("{\"field\":1}\n");
110+
bulk.append("{}\n");
111+
bulk.append("\r\n");
112+
113+
bulkRequest.setJsonEntity(bulk.toString());
114+
115+
expectThrows(ResponseException.class, () -> getRestClient().performRequest(bulkRequest));
116+
}
117+
}

server/src/main/java/org/elasticsearch/action/ActionModule.java

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -160,6 +160,7 @@
160160
import org.elasticsearch.action.admin.indices.template.put.TransportPutIndexTemplateAction;
161161
import org.elasticsearch.action.admin.indices.validate.query.TransportValidateQueryAction;
162162
import org.elasticsearch.action.admin.indices.validate.query.ValidateQueryAction;
163+
import org.elasticsearch.action.bulk.IncrementalBulkService;
163164
import org.elasticsearch.action.bulk.SimulateBulkAction;
164165
import org.elasticsearch.action.bulk.TransportBulkAction;
165166
import org.elasticsearch.action.bulk.TransportShardBulkAction;
@@ -448,6 +449,7 @@ public class ActionModule extends AbstractModule {
448449
private final List<ActionPlugin> actionPlugins;
449450
private final Map<String, ActionHandler<?, ?>> actions;
450451
private final ActionFilters actionFilters;
452+
private final IncrementalBulkService bulkService;
451453
private final AutoCreateIndex autoCreateIndex;
452454
private final DestructiveOperations destructiveOperations;
453455
private final RestController restController;
@@ -476,7 +478,8 @@ public ActionModule(
476478
ClusterService clusterService,
477479
RerouteService rerouteService,
478480
List<ReservedClusterStateHandler<?>> reservedStateHandlers,
479-
RestExtension restExtension
481+
RestExtension restExtension,
482+
IncrementalBulkService bulkService
480483
) {
481484
this.settings = settings;
482485
this.indexNameExpressionResolver = indexNameExpressionResolver;
@@ -488,6 +491,7 @@ public ActionModule(
488491
this.threadPool = threadPool;
489492
actions = setupActions(actionPlugins);
490493
actionFilters = setupActionFilters(actionPlugins);
494+
this.bulkService = bulkService;
491495
autoCreateIndex = new AutoCreateIndex(settings, clusterSettings, indexNameExpressionResolver, systemIndices);
492496
destructiveOperations = new DestructiveOperations(settings, clusterSettings);
493497
Set<RestHeaderDefinition> headers = Stream.concat(
@@ -928,7 +932,7 @@ public void initRestHandlers(Supplier<DiscoveryNodes> nodesInCluster, Predicate<
928932
registerHandler.accept(new RestCountAction());
929933
registerHandler.accept(new RestTermVectorsAction());
930934
registerHandler.accept(new RestMultiTermVectorsAction());
931-
registerHandler.accept(new RestBulkAction(settings));
935+
registerHandler.accept(new RestBulkAction(settings, bulkService));
932936
registerHandler.accept(new RestUpdateAction());
933937

934938
registerHandler.accept(new RestSearchAction(restController.getSearchUsageHolder(), clusterSupportsFeature));

server/src/main/java/org/elasticsearch/action/bulk/BulkRequestParser.java

Lines changed: 50 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -86,13 +86,13 @@ public BulkRequestParser(boolean deprecateOrErrorOnType, RestApiVersion restApiV
8686
.withRestApiVersion(restApiVersion);
8787
}
8888

89-
private static int findNextMarker(byte marker, int from, BytesReference data) {
89+
private static int findNextMarker(byte marker, int from, BytesReference data, boolean isIncremental) {
9090
final int res = data.indexOf(marker, from);
9191
if (res != -1) {
9292
assert res >= 0;
9393
return res;
9494
}
95-
if (from != data.length()) {
95+
if (from != data.length() && isIncremental == false) {
9696
throw new IllegalArgumentException("The bulk request must be terminated by a newline [\\n]");
9797
}
9898
return res;
@@ -137,18 +137,57 @@ public void parse(
137137
Consumer<UpdateRequest> updateRequestConsumer,
138138
Consumer<DeleteRequest> deleteRequestConsumer
139139
) throws IOException {
140-
XContent xContent = xContentType.xContent();
141-
int line = 0;
142-
int from = 0;
143-
byte marker = xContent.bulkSeparator();
144140
// Bulk requests can contain a lot of repeated strings for the index, pipeline and routing parameters. This map is used to
145141
// deduplicate duplicate strings parsed for these parameters. While it does not prevent instantiating the duplicate strings, it
146142
// reduces their lifetime to the lifetime of this parse call instead of the lifetime of the full bulk request.
147143
final Map<String, String> stringDeduplicator = new HashMap<>();
144+
145+
incrementalParse(
146+
data,
147+
defaultIndex,
148+
defaultRouting,
149+
defaultFetchSourceContext,
150+
defaultPipeline,
151+
defaultRequireAlias,
152+
defaultRequireDataStream,
153+
defaultListExecutedPipelines,
154+
allowExplicitIndex,
155+
xContentType,
156+
indexRequestConsumer,
157+
updateRequestConsumer,
158+
deleteRequestConsumer,
159+
false,
160+
stringDeduplicator
161+
);
162+
}
163+
164+
public int incrementalParse(
165+
BytesReference data,
166+
String defaultIndex,
167+
String defaultRouting,
168+
FetchSourceContext defaultFetchSourceContext,
169+
String defaultPipeline,
170+
Boolean defaultRequireAlias,
171+
Boolean defaultRequireDataStream,
172+
Boolean defaultListExecutedPipelines,
173+
boolean allowExplicitIndex,
174+
XContentType xContentType,
175+
BiConsumer<IndexRequest, String> indexRequestConsumer,
176+
Consumer<UpdateRequest> updateRequestConsumer,
177+
Consumer<DeleteRequest> deleteRequestConsumer,
178+
boolean isIncremental,
179+
Map<String, String> stringDeduplicator
180+
) throws IOException {
181+
XContent xContent = xContentType.xContent();
182+
byte marker = xContent.bulkSeparator();
148183
boolean typesDeprecationLogged = false;
149184

185+
int line = 0;
186+
int from = 0;
187+
int consumed = 0;
188+
150189
while (true) {
151-
int nextMarker = findNextMarker(marker, from, data);
190+
int nextMarker = findNextMarker(marker, from, data, isIncremental);
152191
if (nextMarker == -1) {
153192
break;
154193
}
@@ -333,8 +372,9 @@ public void parse(
333372
.setIfSeqNo(ifSeqNo)
334373
.setIfPrimaryTerm(ifPrimaryTerm)
335374
);
375+
consumed = from;
336376
} else {
337-
nextMarker = findNextMarker(marker, from, data);
377+
nextMarker = findNextMarker(marker, from, data, isIncremental);
338378
if (nextMarker == -1) {
339379
break;
340380
}
@@ -407,9 +447,11 @@ public void parse(
407447
}
408448
// move pointers
409449
from = nextMarker + 1;
450+
consumed = from;
410451
}
411452
}
412453
}
454+
return isIncremental ? consumed : from;
413455
}
414456

415457
@UpdateForV9

0 commit comments

Comments
 (0)