Add http request content stream support#111438
Add http request content stream support#111438mhl-b merged 17 commits intoelastic:partial-rest-requestsfrom
Conversation
|
Pinging @elastic/es-distributed (Team:Distributed) |
modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpAggregator.java
Show resolved
Hide resolved
DaveCTurner
left a comment
There was a problem hiding this comment.
I need to look at the details some more, but I left comments from an initial pass.
| } | ||
| }; | ||
|
|
||
| contentStream.setHandler(contentConsumer); // must setup handler before first request |
There was a problem hiding this comment.
I wonder, rather than another layer of indirection here could we add a chunk-consuming method to RestChannelConsumer? Or possibly return something from prepareRequest which implements both RestChannelConsumer and a new RequestBodyChunkConsumer method to indicate we want to process a streaming body?
There was a problem hiding this comment.
I added RequestBodyChunkConsumer
public interface RequestBodyChunkConsumer extends RestChannelConsumer {
void handleChunk(RestChannel channel, ReleasableBytesReference chunk, boolean isLast);
}and wiring for handler into body stream in handleRequest
if (request.isStreamedContent()) {
assert action instanceof RequestBodyChunkConsumer;
var chunkConsumer = (RequestBodyChunkConsumer) action;
request.contentStream().setHandler((chunk, isLast) -> chunkConsumer.handleChunk(channel, chunk, isLast));
}implementation looks like this
@Override
protected RestChannelConsumer prepareRequest(RestRequest request, NodeClient client) {
return new RequestBodyChunkConsumer() {
int totalBytes = 0;
@Override
public void handleChunk(RestChannel channel, ReleasableBytesReference chunk, boolean isLast) {
try (chunk) {
totalBytes += chunk.length();
if (isLast == false) {
request.contentStream().requestBytes(1024);
} else {
channel.sendResponse(new RestResponse(RestStatus.OK, Integer.toString(totalBytes)));
}
}
}
@Override
public void accept(RestChannel channel) throws Exception {
request.contentStream().requestBytes(1024); // ask for first chunk
}
};
}
...y4/src/internalClusterTest/java/org/elasticsearch/http/netty4/Netty4HttpRequestStreamIT.java
Outdated
Show resolved
Hide resolved
...y4/src/internalClusterTest/java/org/elasticsearch/http/netty4/Netty4HttpRequestStreamIT.java
Outdated
Show resolved
Hide resolved
server/src/main/java/org/elasticsearch/http/HttpClientStatsTracker.java
Outdated
Show resolved
Hide resolved
|
Sorry for being slow on the review. I started to read it and find that I need more time to even understand what is going on in this newer version. If @DaveCTurner and you can move fast, definitely no need to block on me. I still plan to review it which is also a learning opportunity for me. Thanks! |
| Unpooled.EMPTY_BUFFER, | ||
| request.headers(), | ||
| EmptyHttpHeaders.INSTANCE | ||
| ), |
There was a problem hiding this comment.
Are we just using a FullHttpRequest because this is a POC, and to minimise the change? HttpRequest would seem more appropriate here.
There was a problem hiding this comment.
to minimise the change?
Yes, it's one of the non-essential small things that touches many lines of code and bloat PR. Still need to address this, no reason to pass FullHttpRequest here.
ywangd
left a comment
There was a problem hiding this comment.
I had another look and understand it better now. Left a few comment including a semi-important one for potential breaking change for audit logs. Thanks!
modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpRequest.java
Outdated
Show resolved
Hide resolved
...ransport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpPipeliningHandler.java
Outdated
Show resolved
Hide resolved
| netty4HttpRequest = new Netty4HttpRequest(readSequence++, fullHttpRequest); | ||
| } else { | ||
| var contentStream = new Netty4HttpRequestBodyStream(ctx.channel()); | ||
| currentRequestStream = contentStream; |
There was a problem hiding this comment.
Similarly, can we assert currentRequestStream is null here or it can be non-null?
There was a problem hiding this comment.
It can be non-null if previous request was stream too. When we see HttpRequest that means we received all parts of previous request - all HttpContent's and single LastHttpContent. At this point previous parts should be either in previous stream queue or processed by rest handler.
There was a problem hiding this comment.
It can be non-null if previous request was stream too
We set currentRequestStream back to null on receiving LastHttpContent. Or do you mean that may not necessary happen for every streaming request?
There was a problem hiding this comment.
Right, sorry I lost track of my own changes :) In normal circumstances should be null. If request is not properly terminated (no last content) and there is new request then currentRequestStream might be not null, but it will end up with decoding failure and connection shutdown. I will add test.
There was a problem hiding this comment.
++ to a test, but could we also assert currentRequestStream == null here? It would be useful documentation if nothing else.
| netty4HttpRequest = new Netty4HttpRequest(readSequence++, fullHttpRequest); | ||
| handlePipelinedRequest(ctx, netty4HttpRequest); | ||
| } finally { | ||
| activityTracker.stopActivity(); |
There was a problem hiding this comment.
I think the activityTracker needs to include the else branch for stream process?
There was a problem hiding this comment.
Still figuring out activity tracker change. When there is enough data in channel and we turned off auto-read then next channel.read() will be executed right a way in same thread and stack. That means we hit activity tracker multiple times due to recursive calls in netty. It trips our assertions that thread is already active.
This recursion should not be an issue, since read-buffer is 64kb, and chunks would be about 8kb, so up to 8 recursive calls might happen.
There was a problem hiding this comment.
We need to make ActivityTracker reentrant for tracking write-path activity anyway, may as well do that here too:
public boolean maybeStartActivity() {
assert trackedThread == Thread.currentThread() : trackedThread.getName() + " vs " + Thread.currentThread().getName();
if (isIdle(get())) {
startActivity();
return true;
} else {
return false;
}
}
There was a problem hiding this comment.
After my changes in Netty4HttpRequestBodyStream that incorporated flow control and queue its no longer an issue. Moved start/end activity to it's original place. * scratching my head *
...ransport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpRequestBodyStream.java
Outdated
Show resolved
Hide resolved
| if (httpRequest.body().isFull()) { | ||
| return httpRequest.body().asFull().bytes(); | ||
| } else { | ||
| return BytesArray.EMPTY; | ||
| } |
There was a problem hiding this comment.
Hmm this now has a problem with audit logs which can be configured to log requests body. It is not really advisable to do that for bulk requests. But we do see that from time to time. The streaming here now makes it impossible to log the request body. Unless we find a workaround, it would be a breaking change.
There was a problem hiding this comment.
It's possible to log with stream. You can wrap stream handlers one into another and do metering, logging, sniffing, even filtering. Might be not too friendly, pseudo-code:
currentHandler = httpRequest.body().contentStream().handler();
loggingBuffer = new SomeByteBuffer();
httpRequest.body().contentStream().setHandler((chunk, isLast) -> {
copyChunk = chunk.copy();
loggingBuffer.add(copyChunk);
if (isLast) {
log.info("content={}", loggingBuffer);
}
currentHandler.onNext(chunk, isLast);
});There was a problem hiding this comment.
Not sure how viable this would be with how auditing works today. The request payload is logged when the Rest handler is invoked. At this point, we simply don't have the full body.
There was a problem hiding this comment.
We can add a bit of logic into HttpAggregator, when logging is enabled do full aggregation. But it's confusing side effect of enabling logging. Yet not breaking.
...ransport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpRequestBodyStream.java
Outdated
Show resolved
Hide resolved
...ternalClusterTest/java/org/elasticsearch/http/netty4/Netty4IncrementalRequestHandlingIT.java
Outdated
Show resolved
Hide resolved
Rename `xContent.streamSeparator()` and `RestHandler.supportsStreamContent()` to `xContent.bulkSeparator()` and `RestHandler.supportsBulkContent()`. I want to reserve use of "supportsStreamContent" for current work in HTTP layer to [support incremental content handling](#111438) besides fully aggregated byte buffers. `supportsStreamContent` would indicate that handler can parse chunks of http content as they arrive.
|
@DaveCTurner @ywangd @nicktindall @Tim-Brooks @henningandersen Remaining work from comments and discussions outside of PR:
There might be a cleaner path forward with HttpObjectAggregator removal. Use aggregation in rest handler with default behaviour - "aggregate all into single buffer", so all existing handlers wont notice the difference. There are other benefits of HttpObjectAggregator removal, such as dealing with 100/413/417 in correct pipelining order. Right now these responses might go back out-of-order, since aggregator is located before pipelining handler and not aware of it. We are getting very close or already there with rest handler interface. There is no slicing or aggregation in netty, ByteBuf chunks will flow to the rest handler. There is only queueing in netty to protect downstream from receiving not-requested chunks, in practice should be few chunks. The minimal set is new RequestBodyChunkConsumer(){
public void handleChunk(RestChannel channel, ReleasableBytesReference chunk, boolean isLast) {
processChunk(chunk);
if (isLast == false) {
channel.request().contentStream().next();
}
}
} |
DaveCTurner
left a comment
There was a problem hiding this comment.
I think this is a good basis for the rest of the work in this area. I left a few superficial comments for now but they can be addressed in follow-ups.
| netty4HttpRequest = new Netty4HttpRequest(readSequence++, fullHttpRequest); | ||
| } else { | ||
| var contentStream = new Netty4HttpRequestBodyStream(ctx.channel()); | ||
| currentRequestStream = contentStream; |
There was a problem hiding this comment.
++ to a test, but could we also assert currentRequestStream == null here? It would be useful documentation if nothing else.
| var t = queue.poll(SAFE_AWAIT_TIMEOUT.seconds(), TimeUnit.SECONDS); | ||
| assertNotNull("queue is empty", t); | ||
| return t; | ||
| } catch (InterruptedException e) { |
There was a problem hiding this comment.
nit: please restore interrupt status flag:
| } catch (InterruptedException e) { | |
| } catch (InterruptedException e) { | |
| Thread.currentThread().interrupt(); |
| for (int mb = 0; mb <= 50; mb += 10) { | ||
| var minBufSize = payloadSize - MBytes(10 + mb); | ||
| var maxBufSize = payloadSize - MBytes(mb); | ||
| assertBusy(() -> { |
There was a problem hiding this comment.
Do we need to assertBusy() here? If so, could you comment explaining why? I would expect that once the client's event loop is idle this assertion should pass straight away. Maybe there's no easy way to wait for the event loop to become idle tho?
| for (int i = 0; i < 5; i++) { | ||
| ctx.clientChannel.writeAndFlush(randomContent(MBytes(10), false)); | ||
| } | ||
| ctx.clientChannel.writeAndFlush(LastHttpContent.EMPTY_LAST_CONTENT); |
There was a problem hiding this comment.
Can we assert that this future's isDone() returns false because of the backpressure?
There was a problem hiding this comment.
I will add assertion, but I dont think it's a reliable indicator of backpressure. Even without backpressure, flushing 50MB might return false immediately, flush is not a blocking operation and there might be a few flushes to kernel buffer before netty's channel buffer will be drained.
|
|
||
| static HttpRequest httpRequest(String uri, String opaqueId, int contentLength) { | ||
| var req = new DefaultHttpRequest(HTTP_1_1, POST, uri); | ||
| req.headers().add(CONTENT_LENGTH, contentLength); |
There was a problem hiding this comment.
Could we also have some chunked requests which don't specify the content-length up front?
|
|
||
| static final String ROUTE = "/_test/request-stream"; | ||
|
|
||
| static final ConcurrentHashMap<String, ServerRequestHandler> handlers = new ConcurrentHashMap<>(); |
There was a problem hiding this comment.
I expect there'll only be one active handler at once, so I think we could use that to simplify things along the lines of org.elasticsearch.rest.ChunkedZipResponseIT.RandomZipResponsePlugin#responseRef.
There was a problem hiding this comment.
Suppose to be used by pipelining tests where multiple requests in fly. I havent added test yet.
Rename `xContent.streamSeparator()` and `RestHandler.supportsStreamContent()` to `xContent.bulkSeparator()` and `RestHandler.supportsBulkContent()`. I want to reserve use of "supportsStreamContent" for current work in HTTP layer to [support incremental content handling](elastic#111438) besides fully aggregated byte buffers. `supportsStreamContent` would indicate that handler can parse chunks of http content as they arrive.
Rename `xContent.streamSeparator()` and `RestHandler.supportsStreamContent()` to `xContent.bulkSeparator()` and `RestHandler.supportsBulkContent()`. I want to reserve use of "supportsStreamContent" for current work in HTTP layer to [support incremental content handling](elastic#111438) besides fully aggregated byte buffers. `supportsStreamContent` would indicate that handler can parse chunks of http content as they arrive.
This commit back ports all of the work introduced in: #113044 * #111438 - 5e1f655 * #111865 - 478baf1 * #112179 - 1b77421 * #112227 - cbcbc34 * #112267 - c00768a * #112154 - a03fb12 * #112479 - 95b42a7 * #112608 - ce2d648 * #112629 - 0d55dc6 * #112767 - 2dbbd7d * #112724 - 58e3a39 * dce8a0b * #112974 - 92daeeb * 529d349 * #113161 - e3424bd
This PR adds support of streamed content to RestRequest. Major changes:
RestRequestandHttpRequestfromBytesReferenceto new interfaceHttpBodyHttpBodyhas 2 sub interfacesFullfor fully aggregated requests andStreamfor lazy readersNetty4HttpRequestContentStreamHttpObjectAggregator, wrapped into new classNetty4HttpAggregator