Skip to content

Commit 5e1f655

Browse files
mhl-bTim-Brooks
authored andcommitted
Add http request content stream support (#111438)
1 parent b94720d commit 5e1f655

21 files changed

Lines changed: 953 additions & 48 deletions

File tree

modules/transport-netty4/src/internalClusterTest/java/org/elasticsearch/http/netty4/Netty4IncrementalRequestHandlingIT.java

Lines changed: 436 additions & 0 deletions
Large diffs are not rendered by default.
Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,46 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the Elastic License
4+
* 2.0 and the Server Side Public License, v 1; you may not use this file except
5+
* in compliance with, at your election, the Elastic License 2.0 or the Server
6+
* Side Public License, v 1.
7+
*/
8+
9+
package org.elasticsearch.http.netty4;
10+
11+
import io.netty.handler.codec.http.HttpObjectAggregator;
12+
import io.netty.handler.codec.http.HttpRequest;
13+
14+
import org.elasticsearch.http.HttpPreRequest;
15+
import org.elasticsearch.http.netty4.internal.HttpHeadersAuthenticatorUtils;
16+
17+
import java.util.function.Predicate;
18+
19+
public class Netty4HttpAggregator extends HttpObjectAggregator {
20+
private static final Predicate<HttpPreRequest> IGNORE_TEST = (req) -> req.uri().startsWith("/_test/request-stream") == false;
21+
22+
private final Predicate<HttpPreRequest> decider;
23+
private boolean shouldAggregate;
24+
25+
public Netty4HttpAggregator(int maxContentLength) {
26+
this(maxContentLength, IGNORE_TEST);
27+
}
28+
29+
public Netty4HttpAggregator(int maxContentLength, Predicate<HttpPreRequest> decider) {
30+
super(maxContentLength);
31+
this.decider = decider;
32+
}
33+
34+
@Override
35+
public boolean acceptInboundMessage(Object msg) throws Exception {
36+
if (msg instanceof HttpRequest request) {
37+
var preReq = HttpHeadersAuthenticatorUtils.asHttpPreRequest(request);
38+
shouldAggregate = decider.test(preReq);
39+
}
40+
if (shouldAggregate) {
41+
return super.acceptInboundMessage(msg);
42+
} else {
43+
return false;
44+
}
45+
}
46+
}

modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpPipeliningHandler.java

Lines changed: 37 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -20,8 +20,11 @@
2020
import io.netty.handler.codec.http.DefaultHttpResponse;
2121
import io.netty.handler.codec.http.DefaultLastHttpContent;
2222
import io.netty.handler.codec.http.FullHttpRequest;
23+
import io.netty.handler.codec.http.HttpContent;
2324
import io.netty.handler.codec.http.HttpObject;
25+
import io.netty.handler.codec.http.HttpRequest;
2426
import io.netty.handler.codec.http.HttpResponse;
27+
import io.netty.handler.codec.http.LastHttpContent;
2528
import io.netty.handler.ssl.SslCloseCompletionEvent;
2629
import io.netty.util.ReferenceCountUtil;
2730
import io.netty.util.concurrent.Future;
@@ -71,6 +74,12 @@ private record ChunkedWrite(PromiseCombiner combiner, ChannelPromise onDone, Chu
7174
@Nullable
7275
private ChunkedWrite currentChunkedWrite;
7376

77+
/**
78+
* HTTP request content stream for current request, it's null if there is no current request or request is fully-aggregated
79+
*/
80+
@Nullable
81+
private Netty4HttpRequestBodyStream currentRequestStream;
82+
7483
/*
7584
* The current read and write sequence numbers. Read sequence numbers are attached to requests in the order they are read from the
7685
* channel, and then transferred to responses. A response is not written to the channel context until its sequence number matches the
@@ -110,23 +119,38 @@ public Netty4HttpPipeliningHandler(
110119
public void channelRead(final ChannelHandlerContext ctx, final Object msg) {
111120
activityTracker.startActivity();
112121
try {
113-
assert msg instanceof FullHttpRequest : "Should have fully aggregated message already but saw [" + msg + "]";
114-
final FullHttpRequest fullHttpRequest = (FullHttpRequest) msg;
115-
final Netty4HttpRequest netty4HttpRequest;
116-
if (fullHttpRequest.decoderResult().isFailure()) {
117-
final Throwable cause = fullHttpRequest.decoderResult().cause();
118-
final Exception nonError;
119-
if (cause instanceof Error) {
120-
ExceptionsHelper.maybeDieOnAnotherThread(cause);
121-
nonError = new Exception(cause);
122+
if (msg instanceof HttpRequest request) {
123+
final Netty4HttpRequest netty4HttpRequest;
124+
if (request.decoderResult().isFailure()) {
125+
final Throwable cause = request.decoderResult().cause();
126+
final Exception nonError;
127+
if (cause instanceof Error) {
128+
ExceptionsHelper.maybeDieOnAnotherThread(cause);
129+
nonError = new Exception(cause);
130+
} else {
131+
nonError = (Exception) cause;
132+
}
133+
netty4HttpRequest = new Netty4HttpRequest(readSequence++, (FullHttpRequest) request, nonError);
122134
} else {
123-
nonError = (Exception) cause;
135+
assert currentRequestStream == null : "current stream must be null for new request";
136+
if (request instanceof FullHttpRequest fullHttpRequest) {
137+
netty4HttpRequest = new Netty4HttpRequest(readSequence++, fullHttpRequest);
138+
currentRequestStream = null;
139+
} else {
140+
var contentStream = new Netty4HttpRequestBodyStream(ctx.channel());
141+
currentRequestStream = contentStream;
142+
netty4HttpRequest = new Netty4HttpRequest(readSequence++, request, contentStream);
143+
}
124144
}
125-
netty4HttpRequest = new Netty4HttpRequest(readSequence++, fullHttpRequest, nonError);
145+
handlePipelinedRequest(ctx, netty4HttpRequest);
126146
} else {
127-
netty4HttpRequest = new Netty4HttpRequest(readSequence++, fullHttpRequest);
147+
assert msg instanceof HttpContent : "expect HttpContent got " + msg;
148+
assert currentRequestStream != null : "current stream must exists before handling http content";
149+
currentRequestStream.handleNettyContent((HttpContent) msg);
150+
if (msg instanceof LastHttpContent) {
151+
currentRequestStream = null;
152+
}
128153
}
129-
handlePipelinedRequest(ctx, netty4HttpRequest);
130154
} finally {
131155
activityTracker.stopActivity();
132156
}

modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpRequest.java

Lines changed: 27 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
import io.netty.buffer.ByteBuf;
1313
import io.netty.buffer.Unpooled;
1414
import io.netty.handler.codec.http.DefaultFullHttpRequest;
15+
import io.netty.handler.codec.http.EmptyHttpHeaders;
1516
import io.netty.handler.codec.http.FullHttpRequest;
1617
import io.netty.handler.codec.http.HttpHeaderNames;
1718
import io.netty.handler.codec.http.HttpHeaders;
@@ -21,6 +22,7 @@
2122
import io.netty.handler.codec.http.cookie.ServerCookieEncoder;
2223

2324
import org.elasticsearch.common.bytes.BytesReference;
25+
import org.elasticsearch.http.HttpBody;
2426
import org.elasticsearch.http.HttpRequest;
2527
import org.elasticsearch.http.HttpResponse;
2628
import org.elasticsearch.rest.ChunkedRestResponseBodyPart;
@@ -40,22 +42,40 @@
4042
public class Netty4HttpRequest implements HttpRequest {
4143

4244
private final FullHttpRequest request;
43-
private final BytesReference content;
45+
private final HttpBody content;
4446
private final Map<String, List<String>> headers;
4547
private final AtomicBoolean released;
4648
private final Exception inboundException;
4749
private final boolean pooled;
4850
private final int sequence;
4951

52+
Netty4HttpRequest(int sequence, io.netty.handler.codec.http.HttpRequest request, Netty4HttpRequestBodyStream contentStream) {
53+
this(
54+
sequence,
55+
new DefaultFullHttpRequest(
56+
request.protocolVersion(),
57+
request.method(),
58+
request.uri(),
59+
Unpooled.EMPTY_BUFFER,
60+
request.headers(),
61+
EmptyHttpHeaders.INSTANCE
62+
),
63+
new AtomicBoolean(false),
64+
false,
65+
contentStream,
66+
null
67+
);
68+
}
69+
5070
Netty4HttpRequest(int sequence, FullHttpRequest request) {
51-
this(sequence, request, new AtomicBoolean(false), true, Netty4Utils.toBytesReference(request.content()));
71+
this(sequence, request, new AtomicBoolean(false), true, Netty4Utils.fullHttpBodyFrom(request.content()));
5272
}
5373

5474
Netty4HttpRequest(int sequence, FullHttpRequest request, Exception inboundException) {
55-
this(sequence, request, new AtomicBoolean(false), true, Netty4Utils.toBytesReference(request.content()), inboundException);
75+
this(sequence, request, new AtomicBoolean(false), true, Netty4Utils.fullHttpBodyFrom(request.content()), inboundException);
5676
}
5777

58-
private Netty4HttpRequest(int sequence, FullHttpRequest request, AtomicBoolean released, boolean pooled, BytesReference content) {
78+
private Netty4HttpRequest(int sequence, FullHttpRequest request, AtomicBoolean released, boolean pooled, HttpBody content) {
5979
this(sequence, request, released, pooled, content, null);
6080
}
6181

@@ -64,7 +84,7 @@ private Netty4HttpRequest(
6484
FullHttpRequest request,
6585
AtomicBoolean released,
6686
boolean pooled,
67-
BytesReference content,
87+
HttpBody content,
6888
Exception inboundException
6989
) {
7090
this.sequence = sequence;
@@ -87,7 +107,7 @@ public String uri() {
87107
}
88108

89109
@Override
90-
public BytesReference content() {
110+
public HttpBody body() {
91111
assert released.get() == false;
92112
return content;
93113
}
@@ -119,7 +139,7 @@ public HttpRequest releaseAndCopy() {
119139
),
120140
new AtomicBoolean(false),
121141
false,
122-
Netty4Utils.toBytesReference(copiedContent)
142+
content
123143
);
124144
} finally {
125145
release();
Lines changed: 114 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,114 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the Elastic License
4+
* 2.0 and the Server Side Public License, v 1; you may not use this file except
5+
* in compliance with, at your election, the Elastic License 2.0 or the Server
6+
* Side Public License, v 1.
7+
*/
8+
9+
package org.elasticsearch.http.netty4;
10+
11+
import io.netty.channel.Channel;
12+
import io.netty.handler.codec.http.HttpContent;
13+
import io.netty.handler.codec.http.LastHttpContent;
14+
15+
import org.elasticsearch.http.HttpBody;
16+
import org.elasticsearch.transport.netty4.Netty4Utils;
17+
18+
import java.util.ArrayDeque;
19+
import java.util.Queue;
20+
21+
/**
22+
* Netty based implementation of {@link HttpBody.Stream}.
23+
* This implementation utilize {@link io.netty.channel.ChannelConfig#setAutoRead(boolean)}
24+
* to prevent entire payload buffering. But sometimes upstream can send few chunks of data despite
25+
* autoRead=off. In this case chunks will be queued until downstream calls {@link Stream#next()}
26+
*/
27+
public class Netty4HttpRequestBodyStream implements HttpBody.Stream {
28+
29+
private final Channel channel;
30+
private final Queue<HttpContent> chunkQueue = new ArrayDeque<>();
31+
private boolean requested = false;
32+
private boolean hasLast = false;
33+
private HttpBody.ChunkHandler handler;
34+
35+
public Netty4HttpRequestBodyStream(Channel channel) {
36+
this.channel = channel;
37+
channel.closeFuture().addListener((f) -> releaseQueuedChunks());
38+
channel.config().setAutoRead(false);
39+
}
40+
41+
@Override
42+
public ChunkHandler handler() {
43+
return handler;
44+
}
45+
46+
@Override
47+
public void setHandler(ChunkHandler chunkHandler) {
48+
this.handler = chunkHandler;
49+
}
50+
51+
private void sendQueuedOrRead() {
52+
assert channel.eventLoop().inEventLoop();
53+
requested = true;
54+
var chunk = chunkQueue.poll();
55+
if (chunk == null) {
56+
channel.read();
57+
} else {
58+
sendChunk(chunk);
59+
}
60+
}
61+
62+
@Override
63+
public void next() {
64+
assert handler != null : "handler must be set before requesting next chunk";
65+
if (channel.eventLoop().inEventLoop()) {
66+
sendQueuedOrRead();
67+
} else {
68+
channel.eventLoop().submit(this::sendQueuedOrRead);
69+
}
70+
}
71+
72+
public void handleNettyContent(HttpContent httpContent) {
73+
assert handler != null : "handler must be set before processing http content";
74+
if (requested && chunkQueue.isEmpty()) {
75+
sendChunk(httpContent);
76+
} else {
77+
chunkQueue.add(httpContent);
78+
}
79+
if (httpContent instanceof LastHttpContent) {
80+
hasLast = true;
81+
channel.config().setAutoRead(true);
82+
}
83+
}
84+
85+
// visible for test
86+
Channel channel() {
87+
return channel;
88+
}
89+
90+
// visible for test
91+
Queue<HttpContent> chunkQueue() {
92+
return chunkQueue;
93+
}
94+
95+
// visible for test
96+
boolean hasLast() {
97+
return hasLast;
98+
}
99+
100+
private void sendChunk(HttpContent httpContent) {
101+
assert requested;
102+
requested = false;
103+
var bytesRef = Netty4Utils.toReleasableBytesReference(httpContent.content());
104+
var isLast = httpContent instanceof LastHttpContent;
105+
handler.onNext(bytesRef, isLast);
106+
}
107+
108+
private void releaseQueuedChunks() {
109+
while (chunkQueue.isEmpty() == false) {
110+
chunkQueue.poll().release();
111+
}
112+
}
113+
114+
}

modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpServerTransport.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -366,7 +366,7 @@ protected HttpMessage createMessage(String[] initialLine) throws Exception {
366366
);
367367
}
368368
// combines the HTTP message pieces into a single full HTTP request (with headers and body)
369-
final HttpObjectAggregator aggregator = new HttpObjectAggregator(handlingSettings.maxContentLength());
369+
final HttpObjectAggregator aggregator = new Netty4HttpAggregator(handlingSettings.maxContentLength());
370370
aggregator.setMaxCumulationBufferComponents(transport.maxCompositeBufferComponents);
371371
ch.pipeline()
372372
.addLast("decoder_compress", new HttpContentDecompressor()) // this handles request body decompression

modules/transport-netty4/src/main/java/org/elasticsearch/transport/netty4/Netty4Utils.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,11 +27,13 @@
2727
import org.elasticsearch.action.ActionListener;
2828
import org.elasticsearch.common.bytes.BytesArray;
2929
import org.elasticsearch.common.bytes.BytesReference;
30+
import org.elasticsearch.common.bytes.ReleasableBytesReference;
3031
import org.elasticsearch.common.recycler.Recycler;
3132
import org.elasticsearch.common.settings.Settings;
3233
import org.elasticsearch.common.util.concurrent.EsExecutors;
3334
import org.elasticsearch.core.Booleans;
3435
import org.elasticsearch.core.SuppressForbidden;
36+
import org.elasticsearch.http.HttpBody;
3537
import org.elasticsearch.transport.TransportException;
3638

3739
import java.io.IOException;
@@ -128,6 +130,14 @@ public static BytesReference toBytesReference(final ByteBuf buffer) {
128130
}
129131
}
130132

133+
public static ReleasableBytesReference toReleasableBytesReference(final ByteBuf buffer) {
134+
return new ReleasableBytesReference(toBytesReference(buffer), buffer::release);
135+
}
136+
137+
public static HttpBody.Full fullHttpBodyFrom(final ByteBuf buf) {
138+
return new HttpBody.ByteRefHttpBody(toBytesReference(buf));
139+
}
140+
131141
public static Recycler<BytesRef> createRecycler(Settings settings) {
132142
// If this method is called by super ctor the processors will not be set. Accessing NettyAllocator initializes netty's internals
133143
// setting the processors. We must do it ourselves first just in case.

0 commit comments

Comments
 (0)