Skip to content

Commit 92daeeb

Browse files
committed
Properly handle empty incremental bulk requests (#112974)
This commit ensures we properly throw exceptions when an empty bulk request is received with the incremental handling enabled.
1 parent dce8a0b commit 92daeeb

3 files changed

Lines changed: 74 additions & 39 deletions

File tree

distribution/archives/integ-test-zip/src/javaRestTest/java/org/elasticsearch/test/rest/RequestsWithoutContentIT.java

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -26,12 +26,10 @@ public void testIndexMissingBody() throws IOException {
2626
assertResponseException(responseException, "request body is required");
2727
}
2828

29-
@AwaitsFix(bugUrl = "need to decide how to handle this scenario")
3029
public void testBulkMissingBody() throws IOException {
31-
ResponseException responseException = expectThrows(
32-
ResponseException.class,
33-
() -> client().performRequest(new Request(randomBoolean() ? "POST" : "PUT", "/_bulk"))
34-
);
30+
Request request = new Request(randomBoolean() ? "POST" : "PUT", "/_bulk");
31+
request.setJsonEntity("");
32+
ResponseException responseException = expectThrows(ResponseException.class, () -> client().performRequest(request));
3533
assertResponseException(responseException, "request body is required");
3634
}
3735

qa/smoke-test-http/src/javaRestTest/java/org/elasticsearch/http/IncrementalBulkRestIT.java

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,11 +23,33 @@
2323
import java.util.Map;
2424

2525
import static org.elasticsearch.rest.RestStatus.OK;
26+
import static org.hamcrest.CoreMatchers.containsString;
2627
import static org.hamcrest.Matchers.equalTo;
2728

2829
@ESIntegTestCase.ClusterScope(scope = ESIntegTestCase.Scope.SUITE, supportsDedicatedMasters = false, numDataNodes = 2, numClientNodes = 0)
2930
public class IncrementalBulkRestIT extends HttpSmokeTestCase {
3031

32+
public void testBulkMissingBody() throws IOException {
33+
Request request = new Request(randomBoolean() ? "POST" : "PUT", "/_bulk");
34+
request.setJsonEntity("");
35+
ResponseException responseException = expectThrows(ResponseException.class, () -> getRestClient().performRequest(request));
36+
assertEquals(400, responseException.getResponse().getStatusLine().getStatusCode());
37+
assertThat(responseException.getMessage(), containsString("request body is required"));
38+
}
39+
40+
public void testBulkRequestBodyImproperlyTerminated() throws IOException {
41+
Request request = new Request(randomBoolean() ? "POST" : "PUT", "/_bulk");
42+
// missing final line of the bulk body. cannot process
43+
request.setJsonEntity(
44+
"{\"index\":{\"_index\":\"index_name\",\"_id\":\"1\"}}\n"
45+
+ "{\"field\":1}\n"
46+
+ "{\"index\":{\"_index\":\"index_name\",\"_id\":\"2\"}"
47+
);
48+
ResponseException responseException = expectThrows(ResponseException.class, () -> getRestClient().performRequest(request));
49+
assertEquals(400, responseException.getResponse().getStatusLine().getStatusCode());
50+
assertThat(responseException.getMessage(), containsString("could not parse bulk request body"));
51+
}
52+
3153
public void testIncrementalBulk() throws IOException {
3254
Request createRequest = new Request("PUT", "/index_name");
3355
createRequest.setJsonEntity("""

server/src/main/java/org/elasticsearch/rest/action/document/RestBulkAction.java

Lines changed: 49 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99

1010
package org.elasticsearch.rest.action.document;
1111

12+
import org.elasticsearch.ElasticsearchParseException;
1213
import org.elasticsearch.action.DocWriteRequest;
1314
import org.elasticsearch.action.bulk.BulkRequest;
1415
import org.elasticsearch.action.bulk.BulkRequestParser;
@@ -150,6 +151,7 @@ static class ChunkHandler implements BaseRestHandler.RequestBodyChunkConsumer {
150151

151152
private volatile RestChannel restChannel;
152153
private boolean shortCircuited;
154+
private int bytesParsed = 0;
153155
private final ArrayDeque<ReleasableBytesReference> unParsedChunks = new ArrayDeque<>(4);
154156
private final ArrayList<DocWriteRequest<?>> items = new ArrayList<>(4);
155157

@@ -186,48 +188,61 @@ public void handleChunk(RestChannel channel, ReleasableBytesReference chunk, boo
186188

187189
final BytesReference data;
188190
int bytesConsumed;
189-
try {
190-
unParsedChunks.add(chunk);
191+
if (chunk.length() == 0) {
192+
chunk.close();
193+
bytesConsumed = 0;
194+
} else {
195+
try {
196+
unParsedChunks.add(chunk);
191197

192-
if (unParsedChunks.size() > 1) {
193-
data = CompositeBytesReference.of(unParsedChunks.toArray(new ReleasableBytesReference[0]));
194-
} else {
195-
data = chunk;
196-
}
198+
if (unParsedChunks.size() > 1) {
199+
data = CompositeBytesReference.of(unParsedChunks.toArray(new ReleasableBytesReference[0]));
200+
} else {
201+
data = chunk;
202+
}
197203

198-
// TODO: Check that the behavior here vs. globalRouting, globalPipeline, globalRequireAlias, globalRequireDatsStream in
199-
// BulkRequest#add is fine
200-
bytesConsumed = parser.incrementalParse(
201-
data,
202-
defaultIndex,
203-
defaultRouting,
204-
defaultFetchSourceContext,
205-
defaultPipeline,
206-
defaultRequireAlias,
207-
defaultRequireDataStream,
208-
defaultListExecutedPipelines,
209-
allowExplicitIndex,
210-
request.getXContentType(),
211-
(request, type) -> items.add(request),
212-
items::add,
213-
items::add,
214-
isLast == false,
215-
stringDeduplicator
216-
);
204+
// TODO: Check that the behavior here vs. globalRouting, globalPipeline, globalRequireAlias, globalRequireDatsStream in
205+
// BulkRequest#add is fine
206+
bytesConsumed = parser.incrementalParse(
207+
data,
208+
defaultIndex,
209+
defaultRouting,
210+
defaultFetchSourceContext,
211+
defaultPipeline,
212+
defaultRequireAlias,
213+
defaultRequireDataStream,
214+
defaultListExecutedPipelines,
215+
allowExplicitIndex,
216+
request.getXContentType(),
217+
(request, type) -> items.add(request),
218+
items::add,
219+
items::add,
220+
isLast == false,
221+
stringDeduplicator
222+
);
223+
bytesParsed += bytesConsumed;
217224

218-
} catch (Exception e) {
219-
shortCircuit();
220-
new RestToXContentListener<>(channel).onFailure(e);
221-
return;
225+
} catch (Exception e) {
226+
shortCircuit();
227+
new RestToXContentListener<>(channel).onFailure(
228+
new ElasticsearchParseException("could not parse bulk request body", e)
229+
);
230+
return;
231+
}
222232
}
223233

224234
final ArrayList<Releasable> releasables = accountParsing(bytesConsumed);
225235
if (isLast) {
226236
assert unParsedChunks.isEmpty();
227-
assert channel != null;
228-
ArrayList<DocWriteRequest<?>> toPass = new ArrayList<>(items);
229-
items.clear();
230-
handler.lastItems(toPass, () -> Releasables.close(releasables), new RestRefCountedChunkedToXContentListener<>(channel));
237+
if (bytesParsed == 0) {
238+
shortCircuit();
239+
new RestToXContentListener<>(channel).onFailure(new ElasticsearchParseException("request body is required"));
240+
} else {
241+
assert channel != null;
242+
ArrayList<DocWriteRequest<?>> toPass = new ArrayList<>(items);
243+
items.clear();
244+
handler.lastItems(toPass, () -> Releasables.close(releasables), new RestRefCountedChunkedToXContentListener<>(channel));
245+
}
231246
} else if (items.isEmpty() == false) {
232247
ArrayList<DocWriteRequest<?>> toPass = new ArrayList<>(items);
233248
items.clear();

0 commit comments

Comments
 (0)