|
9 | 9 |
|
10 | 10 | package org.elasticsearch.rest.action.document; |
11 | 11 |
|
| 12 | +import org.elasticsearch.ElasticsearchParseException; |
12 | 13 | import org.elasticsearch.action.DocWriteRequest; |
13 | 14 | import org.elasticsearch.action.bulk.BulkRequest; |
14 | 15 | import org.elasticsearch.action.bulk.BulkRequestParser; |
@@ -150,6 +151,7 @@ static class ChunkHandler implements BaseRestHandler.RequestBodyChunkConsumer { |
150 | 151 |
|
151 | 152 | private volatile RestChannel restChannel; |
152 | 153 | private boolean shortCircuited; |
| 154 | + private int bytesParsed = 0; |
153 | 155 | private final ArrayDeque<ReleasableBytesReference> unParsedChunks = new ArrayDeque<>(4); |
154 | 156 | private final ArrayList<DocWriteRequest<?>> items = new ArrayList<>(4); |
155 | 157 |
|
@@ -186,48 +188,61 @@ public void handleChunk(RestChannel channel, ReleasableBytesReference chunk, boo |
186 | 188 |
|
187 | 189 | final BytesReference data; |
188 | 190 | int bytesConsumed; |
189 | | - try { |
190 | | - unParsedChunks.add(chunk); |
| 191 | + if (chunk.length() == 0) { |
| 192 | + chunk.close(); |
| 193 | + bytesConsumed = 0; |
| 194 | + } else { |
| 195 | + try { |
| 196 | + unParsedChunks.add(chunk); |
191 | 197 |
|
192 | | - if (unParsedChunks.size() > 1) { |
193 | | - data = CompositeBytesReference.of(unParsedChunks.toArray(new ReleasableBytesReference[0])); |
194 | | - } else { |
195 | | - data = chunk; |
196 | | - } |
| 198 | + if (unParsedChunks.size() > 1) { |
| 199 | + data = CompositeBytesReference.of(unParsedChunks.toArray(new ReleasableBytesReference[0])); |
| 200 | + } else { |
| 201 | + data = chunk; |
| 202 | + } |
197 | 203 |
|
198 | | - // TODO: Check that the behavior here vs. globalRouting, globalPipeline, globalRequireAlias, globalRequireDatsStream in |
199 | | - // BulkRequest#add is fine |
200 | | - bytesConsumed = parser.incrementalParse( |
201 | | - data, |
202 | | - defaultIndex, |
203 | | - defaultRouting, |
204 | | - defaultFetchSourceContext, |
205 | | - defaultPipeline, |
206 | | - defaultRequireAlias, |
207 | | - defaultRequireDataStream, |
208 | | - defaultListExecutedPipelines, |
209 | | - allowExplicitIndex, |
210 | | - request.getXContentType(), |
211 | | - (request, type) -> items.add(request), |
212 | | - items::add, |
213 | | - items::add, |
214 | | - isLast == false, |
215 | | - stringDeduplicator |
216 | | - ); |
| 204 | + // TODO: Check that the behavior here vs. globalRouting, globalPipeline, globalRequireAlias, globalRequireDatsStream in |
| 205 | + // BulkRequest#add is fine |
| 206 | + bytesConsumed = parser.incrementalParse( |
| 207 | + data, |
| 208 | + defaultIndex, |
| 209 | + defaultRouting, |
| 210 | + defaultFetchSourceContext, |
| 211 | + defaultPipeline, |
| 212 | + defaultRequireAlias, |
| 213 | + defaultRequireDataStream, |
| 214 | + defaultListExecutedPipelines, |
| 215 | + allowExplicitIndex, |
| 216 | + request.getXContentType(), |
| 217 | + (request, type) -> items.add(request), |
| 218 | + items::add, |
| 219 | + items::add, |
| 220 | + isLast == false, |
| 221 | + stringDeduplicator |
| 222 | + ); |
| 223 | + bytesParsed += bytesConsumed; |
217 | 224 |
|
218 | | - } catch (Exception e) { |
219 | | - shortCircuit(); |
220 | | - new RestToXContentListener<>(channel).onFailure(e); |
221 | | - return; |
| 225 | + } catch (Exception e) { |
| 226 | + shortCircuit(); |
| 227 | + new RestToXContentListener<>(channel).onFailure( |
| 228 | + new ElasticsearchParseException("could not parse bulk request body", e) |
| 229 | + ); |
| 230 | + return; |
| 231 | + } |
222 | 232 | } |
223 | 233 |
|
224 | 234 | final ArrayList<Releasable> releasables = accountParsing(bytesConsumed); |
225 | 235 | if (isLast) { |
226 | 236 | assert unParsedChunks.isEmpty(); |
227 | | - assert channel != null; |
228 | | - ArrayList<DocWriteRequest<?>> toPass = new ArrayList<>(items); |
229 | | - items.clear(); |
230 | | - handler.lastItems(toPass, () -> Releasables.close(releasables), new RestRefCountedChunkedToXContentListener<>(channel)); |
| 237 | + if (bytesParsed == 0) { |
| 238 | + shortCircuit(); |
| 239 | + new RestToXContentListener<>(channel).onFailure(new ElasticsearchParseException("request body is required")); |
| 240 | + } else { |
| 241 | + assert channel != null; |
| 242 | + ArrayList<DocWriteRequest<?>> toPass = new ArrayList<>(items); |
| 243 | + items.clear(); |
| 244 | + handler.lastItems(toPass, () -> Releasables.close(releasables), new RestRefCountedChunkedToXContentListener<>(channel)); |
| 245 | + } |
231 | 246 | } else if (items.isEmpty() == false) { |
232 | 247 | ArrayList<DocWriteRequest<?>> toPass = new ArrayList<>(items); |
233 | 248 | items.clear(); |
|
0 commit comments