Skip to content

Add http request content stream support#111438

Merged
mhl-b merged 17 commits intoelastic:partial-rest-requestsfrom
mhl-b:partial-rest-content
Aug 13, 2024
Merged

Add http request content stream support#111438
mhl-b merged 17 commits intoelastic:partial-rest-requestsfrom
mhl-b:partial-rest-content

Conversation

@mhl-b
Copy link
Copy Markdown
Contributor

@mhl-b mhl-b commented Jul 30, 2024

This PR adds support of streamed content to RestRequest. Major changes:

  • Change type of content in RestRequest and HttpRequest from BytesReference to new interface HttpBody
  • HttpBody has 2 sub interfaces Full for fully aggregated requests and Stream for lazy readers
  • Added netty based implementation for the streamed content Netty4HttpRequestContentStream
  • Added aggregation decider to HttpObjectAggregator, wrapped into new class Netty4HttpAggregator

@elasticsearchmachine elasticsearchmachine added the needs:triage Requires assignment of a team area label label Jul 30, 2024
@mhl-b mhl-b added >enhancement :Distributed/Network Http and internode communication implementations Team:Distributed Meta label for distributed team. v8.16.0 and removed needs:triage Requires assignment of a team area label labels Jul 30, 2024
@mhl-b mhl-b requested review from DaveCTurner and ywangd July 30, 2024 02:21
@elasticsearchmachine
Copy link
Copy Markdown
Collaborator

Pinging @elastic/es-distributed (Team:Distributed)

Copy link
Copy Markdown
Member

@DaveCTurner DaveCTurner left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I need to look at the details some more, but I left comments from an initial pass.

}
};

contentStream.setHandler(contentConsumer); // must setup handler before first request
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder, rather than another layer of indirection here could we add a chunk-consuming method to RestChannelConsumer? Or possibly return something from prepareRequest which implements both RestChannelConsumer and a new RequestBodyChunkConsumer method to indicate we want to process a streaming body?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added RequestBodyChunkConsumer

    public interface RequestBodyChunkConsumer extends RestChannelConsumer {
        void handleChunk(RestChannel channel, ReleasableBytesReference chunk, boolean isLast);
    }

and wiring for handler into body stream in handleRequest

if (request.isStreamedContent()) {
  assert action instanceof RequestBodyChunkConsumer;
  var chunkConsumer = (RequestBodyChunkConsumer) action;
  request.contentStream().setHandler((chunk, isLast) -> chunkConsumer.handleChunk(channel, chunk, isLast));
}

implementation looks like this

@Override
protected RestChannelConsumer prepareRequest(RestRequest request, NodeClient client) {
    return new RequestBodyChunkConsumer() {

        int totalBytes = 0;

        @Override
        public void handleChunk(RestChannel channel, ReleasableBytesReference chunk, boolean isLast) {
            try (chunk) {
                totalBytes += chunk.length();
                if (isLast == false) {
                    request.contentStream().requestBytes(1024);
                } else {
                    channel.sendResponse(new RestResponse(RestStatus.OK, Integer.toString(totalBytes)));
                }
            }
        }

        @Override
        public void accept(RestChannel channel) throws Exception {
            request.contentStream().requestBytes(1024); // ask for first chunk
        }
    };
}

@ywangd
Copy link
Copy Markdown
Member

ywangd commented Jul 31, 2024

Sorry for being slow on the review. I started to read it and find that I need more time to even understand what is going on in this newer version. If @DaveCTurner and you can move fast, definitely no need to block on me. I still plan to review it which is also a learning opportunity for me. Thanks!

Unpooled.EMPTY_BUFFER,
request.headers(),
EmptyHttpHeaders.INSTANCE
),
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are we just using a FullHttpRequest because this is a POC, and to minimise the change? HttpRequest would seem more appropriate here.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

to minimise the change?

Yes, it's one of the non-essential small things that touches many lines of code and bloat PR. Still need to address this, no reason to pass FullHttpRequest here.

Copy link
Copy Markdown
Member

@ywangd ywangd left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I had another look and understand it better now. Left a few comment including a semi-important one for potential breaking change for audit logs. Thanks!

netty4HttpRequest = new Netty4HttpRequest(readSequence++, fullHttpRequest);
} else {
var contentStream = new Netty4HttpRequestBodyStream(ctx.channel());
currentRequestStream = contentStream;
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Similarly, can we assert currentRequestStream is null here or it can be non-null?

Copy link
Copy Markdown
Contributor Author

@mhl-b mhl-b Aug 5, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It can be non-null if previous request was stream too. When we see HttpRequest that means we received all parts of previous request - all HttpContent's and single LastHttpContent. At this point previous parts should be either in previous stream queue or processed by rest handler.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It can be non-null if previous request was stream too

We set currentRequestStream back to null on receiving LastHttpContent. Or do you mean that may not necessary happen for every streaming request?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right, sorry I lost track of my own changes :) In normal circumstances should be null. If request is not properly terminated (no last content) and there is new request then currentRequestStream might be not null, but it will end up with decoding failure and connection shutdown. I will add test.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

++ to a test, but could we also assert currentRequestStream == null here? It would be useful documentation if nothing else.

netty4HttpRequest = new Netty4HttpRequest(readSequence++, fullHttpRequest);
handlePipelinedRequest(ctx, netty4HttpRequest);
} finally {
activityTracker.stopActivity();
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the activityTracker needs to include the else branch for stream process?

Copy link
Copy Markdown
Contributor Author

@mhl-b mhl-b Aug 5, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Still figuring out activity tracker change. When there is enough data in channel and we turned off auto-read then next channel.read() will be executed right a way in same thread and stack. That means we hit activity tracker multiple times due to recursive calls in netty. It trips our assertions that thread is already active.
This recursion should not be an issue, since read-buffer is 64kb, and chunks would be about 8kb, so up to 8 recursive calls might happen.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need to make ActivityTracker reentrant for tracking write-path activity anyway, may as well do that here too:

public boolean maybeStartActivity() {
    assert trackedThread == Thread.currentThread() : trackedThread.getName() + " vs " + Thread.currentThread().getName();
    if (isIdle(get())) {
        startActivity();
        return true;
    } else {
        return false;
    }
}

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After my changes in Netty4HttpRequestBodyStream that incorporated flow control and queue its no longer an issue. Moved start/end activity to it's original place. * scratching my head *

Comment on lines +295 to +299
if (httpRequest.body().isFull()) {
return httpRequest.body().asFull().bytes();
} else {
return BytesArray.EMPTY;
}
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm this now has a problem with audit logs which can be configured to log requests body. It is not really advisable to do that for bulk requests. But we do see that from time to time. The streaming here now makes it impossible to log the request body. Unless we find a workaround, it would be a breaking change.

Copy link
Copy Markdown
Contributor Author

@mhl-b mhl-b Aug 5, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's possible to log with stream. You can wrap stream handlers one into another and do metering, logging, sniffing, even filtering. Might be not too friendly, pseudo-code:

currentHandler = httpRequest.body().contentStream().handler();
loggingBuffer = new SomeByteBuffer(); 
httpRequest.body().contentStream().setHandler((chunk, isLast) -> {
  copyChunk = chunk.copy();
  loggingBuffer.add(copyChunk);
  if (isLast) {
    log.info("content={}", loggingBuffer);
  }
  currentHandler.onNext(chunk, isLast);
});

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure how viable this would be with how auditing works today. The request payload is logged when the Rest handler is invoked. At this point, we simply don't have the full body.

Copy link
Copy Markdown
Contributor Author

@mhl-b mhl-b Aug 9, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can add a bit of logic into HttpAggregator, when logging is enabled do full aggregation. But it's confusing side effect of enabling logging. Yet not breaking.

@mhl-b mhl-b requested review from a team as code owners August 8, 2024 01:42
@mhl-b mhl-b removed request for a team August 8, 2024 01:44
elasticsearchmachine pushed a commit that referenced this pull request Aug 8, 2024
Rename `xContent.streamSeparator()` and
`RestHandler.supportsStreamContent()` to `xContent.bulkSeparator()` and
`RestHandler.supportsBulkContent()`.

I want to reserve use of "supportsStreamContent" for current work in
HTTP layer to [support incremental content
handling](#111438) besides
fully aggregated byte buffers. `supportsStreamContent` would indicate
that handler can parse chunks of http content as they arrive.
@mhl-b
Copy link
Copy Markdown
Contributor Author

mhl-b commented Aug 10, 2024

@DaveCTurner @ywangd @nicktindall @Tim-Brooks @henningandersen
A resume of current state. Below list of tasks and conclusions that have been identified and made so far.
If there no major concerns with current code I propose to merge it into feature branch.

Remaining work from comments and discussions outside of PR:

  • 100-continue and 413/417 for oversized content. Since I bypass HttpObjectAggregator I need to handle them explicitly
  • request body logging for audit trail. Might be a painful one
  • request body metrics in HttpClientStatsTracker
  • content length estimation for circuit breaker
  • connect rest controller and http aggregator to decide which routes can bypass aggregation (or remove aggregator)
  • redundancy in Netty4HttpRequest that requires FullHttpRequest
  • control flow before/after decompression?
  • other places that expects full content?

There might be a cleaner path forward with HttpObjectAggregator removal. Use aggregation in rest handler with default behaviour - "aggregate all into single buffer", so all existing handlers wont notice the difference. There are other benefits of HttpObjectAggregator removal, such as dealing with 100/413/417 in correct pipelining order. Right now these responses might go back out-of-order, since aggregator is located before pipelining handler and not aware of it.

We are getting very close or already there with rest handler interface. There is no slicing or aggregation in netty, ByteBuf chunks will flow to the rest handler. There is only queueing in netty to protect downstream from receiving not-requested chunks, in practice should be few chunks.

The minimal set is stream.next() to request next network buffer and void handleChunk(RestChannel channel, ReleasableBytesReference chunk, boolean isLast) to process chunk. A more complex logic can be composed on top of it, like bulk content slicing based on separator.

new RequestBodyChunkConsumer(){
    public void handleChunk(RestChannel channel, ReleasableBytesReference chunk, boolean isLast) {
        processChunk(chunk);
        if (isLast == false) {
            channel.request().contentStream().next();
        }
    }
}

Copy link
Copy Markdown
Member

@DaveCTurner DaveCTurner left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is a good basis for the rest of the work in this area. I left a few superficial comments for now but they can be addressed in follow-ups.

netty4HttpRequest = new Netty4HttpRequest(readSequence++, fullHttpRequest);
} else {
var contentStream = new Netty4HttpRequestBodyStream(ctx.channel());
currentRequestStream = contentStream;
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

++ to a test, but could we also assert currentRequestStream == null here? It would be useful documentation if nothing else.

var t = queue.poll(SAFE_AWAIT_TIMEOUT.seconds(), TimeUnit.SECONDS);
assertNotNull("queue is empty", t);
return t;
} catch (InterruptedException e) {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: please restore interrupt status flag:

Suggested change
} catch (InterruptedException e) {
} catch (InterruptedException e) {
Thread.currentThread().interrupt();

for (int mb = 0; mb <= 50; mb += 10) {
var minBufSize = payloadSize - MBytes(10 + mb);
var maxBufSize = payloadSize - MBytes(mb);
assertBusy(() -> {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need to assertBusy() here? If so, could you comment explaining why? I would expect that once the client's event loop is idle this assertion should pass straight away. Maybe there's no easy way to wait for the event loop to become idle tho?

for (int i = 0; i < 5; i++) {
ctx.clientChannel.writeAndFlush(randomContent(MBytes(10), false));
}
ctx.clientChannel.writeAndFlush(LastHttpContent.EMPTY_LAST_CONTENT);
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we assert that this future's isDone() returns false because of the backpressure?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will add assertion, but I dont think it's a reliable indicator of backpressure. Even without backpressure, flushing 50MB might return false immediately, flush is not a blocking operation and there might be a few flushes to kernel buffer before netty's channel buffer will be drained.


static HttpRequest httpRequest(String uri, String opaqueId, int contentLength) {
var req = new DefaultHttpRequest(HTTP_1_1, POST, uri);
req.headers().add(CONTENT_LENGTH, contentLength);
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we also have some chunked requests which don't specify the content-length up front?


static final String ROUTE = "/_test/request-stream";

static final ConcurrentHashMap<String, ServerRequestHandler> handlers = new ConcurrentHashMap<>();
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I expect there'll only be one active handler at once, so I think we could use that to simplify things along the lines of org.elasticsearch.rest.ChunkedZipResponseIT.RandomZipResponsePlugin#responseRef.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suppose to be used by pipelining tests where multiple requests in fly. I havent added test yet.

@mhl-b mhl-b merged commit d3591ef into elastic:partial-rest-requests Aug 13, 2024
cbuescher pushed a commit to cbuescher/elasticsearch that referenced this pull request Sep 4, 2024
Rename `xContent.streamSeparator()` and
`RestHandler.supportsStreamContent()` to `xContent.bulkSeparator()` and
`RestHandler.supportsBulkContent()`.

I want to reserve use of "supportsStreamContent" for current work in
HTTP layer to [support incremental content
handling](elastic#111438) besides
fully aggregated byte buffers. `supportsStreamContent` would indicate
that handler can parse chunks of http content as they arrive.
davidkyle pushed a commit to davidkyle/elasticsearch that referenced this pull request Sep 5, 2024
Rename `xContent.streamSeparator()` and
`RestHandler.supportsStreamContent()` to `xContent.bulkSeparator()` and
`RestHandler.supportsBulkContent()`.

I want to reserve use of "supportsStreamContent" for current work in
HTTP layer to [support incremental content
handling](elastic#111438) besides
fully aggregated byte buffers. `supportsStreamContent` would indicate
that handler can parse chunks of http content as they arrive.
Tim-Brooks pushed a commit to Tim-Brooks/elasticsearch that referenced this pull request Sep 19, 2024
Tim-Brooks added a commit that referenced this pull request Sep 20, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

:Distributed/Network Http and internode communication implementations >enhancement Team:Distributed Meta label for distributed team. v8.16.0

Projects

None yet

Development

Successfully merging this pull request may close these issues.

6 participants