[EventEngine] Windows Endpoint: optimize reads by chaining synchronous WSARecv operations#32563
[EventEngine] Windows Endpoint: optimize reads by chaining synchronous WSARecv operations#32563drfloob merged 7 commits intogrpc:masterfrom
Conversation
| SliceBuffer* buffer, const ReadArgs* /* args */) { | ||
| GRPC_EVENT_ENGINE_ENDPOINT_TRACE("WindowsEndpoint::%p reading", this); | ||
| if (io_state_->socket->IsShutdown()) { | ||
| executor_->Run([on_read = std::move(on_read)]() mutable { |
There was a problem hiding this comment.
Moving the AnyInvocable requires it. The error with mutable removed:
src/core/lib/event_engine/windows/windows_endpoint.cc(132,7): error: no matching function for call to object of type 'const remove_reference_t<absl::AnyInvocable<void (absl::Status)> &>' (aka 'const absl::AnyInvocable<void (absl::Status)>')
on_read(absl::UnavailableError("Socket is shutting down."));
There was a problem hiding this comment.
TIL, seems like one can not call a non-const method in a const absl::AnyInvocable<>. I guess something like absl::AnyInvocable<void (absl::Status) const> would also work but that will change the EE interface and probably change the callback's meaning too.
| }); | ||
| return false; | ||
| } | ||
| buffer->Clear(); |
There was a problem hiding this comment.
Question: does this mean that the caller of the endpoint should provide an empty SliceBuffer, but in some case the buffer might not be empty, (maybe this simplifies the caller?) and thus the Clear here?
There was a problem hiding this comment.
That's right. Ideally the buffer should be empty. It would be great if any pre-existing slices in the buffer could be reused, but there's no such contract, and it may be the case that they are in use elsewhere and simply not cleaned out of the SliceBuffer.
That said, I would love to change the EventEngine::Read contract to say that any slices in the buffer are now donated to the cause, it could save some allocations. But we'd want to investigate all call sites to see if there is any value in doing so, and we may introduce bugs doing this if slices are left in unintentionally - I don't want to attempt it yet.
| // Choose an appropriate size. | ||
| size_t min_read_size = kDefaultTargetReadSize; | ||
| if (buffer->Length() < min_read_size && buffer->Count() < kMaxWSABUFCount) { | ||
| buffer->AppendIndexed(Slice(allocator_.MakeSlice(min_read_size))); |
There was a problem hiding this comment.
pedantic: should this be allocator_.MakeSlice(min_read_size - buffer->Length()) ?
There was a problem hiding this comment.
I'm not sure if it should, honestly. As it's currently written, the buffer size could end up being between min_read_size and (2 * min_read_size) - 1. If we only allocate up to min_read_size, we may end up doing more allocations over the life of the process because we've only allocated the minimum. It is a tradeoff between memory and cpu utilization, especially in applications with a lot of open endpoints. I'd wager that there aren't many low-memory Windows systems that gRPC is running on, so I'd opt for a larger memory footprint here.
There was a problem hiding this comment.
Gotcha, interesting. Thanks for the explanation!
| io_state_->handle_read_event.ExecuteCallbackAndReset(status); | ||
| }); | ||
| } | ||
| return false; |
There was a problem hiding this comment.
No action require: seems like Read always return false and this change makes the API async in all cases and now it seems to match with the spec:
grpc/include/grpc/event_engine/event_engine.h
Lines 184 to 185 in 97ba987
There was a problem hiding this comment.
That's right. Other implementations needed this optimization, and the Windows EE can take advantage of it if we end up needing it. There aren't any Windows benchmarks that I'm aware of, so deciding when and what to optimize on the platform is a bit fuzzy. For this PR, the performance difference was stark and the improvement was necessary: one specific test timed out with the EE, whereas the iomgr version finished in < 2s.
| // Choose an appropriate size. | ||
| size_t min_read_size = kDefaultTargetReadSize; | ||
| if (buffer->Length() < min_read_size && buffer->Count() < kMaxWSABUFCount) { | ||
| buffer->AppendIndexed(Slice(allocator_.MakeSlice(min_read_size))); |
There was a problem hiding this comment.
Gotcha, interesting. Thanks for the explanation!
|
Internal checks passed, merging. |
…s WSARecv operations (grpc#32563) Built on grpc#32560 When calling EventEngine::Read, if a synchronous WSARecv call completes successfully and 1) the read buffer is not full, and 2) the stream remains open, then the endpoint will now chain execution of more synchronous WSARecvs. The chain is broken and the on_read callback is called when either there are errors, the next call would block, the buffer is full, or the stream is closed. Something like this is helpful to prevent excessive read callback execution under a flood of tiny payloads, presuming messages are not being combined as one would usually expect (see `//test/core/iomgr:endpoint_pair_test`, and Nagle's algorithm).
…s WSARecv operations (#32563) Built on #32560 When calling EventEngine::Read, if a synchronous WSARecv call completes successfully and 1) the read buffer is not full, and 2) the stream remains open, then the endpoint will now chain execution of more synchronous WSARecvs. The chain is broken and the on_read callback is called when either there are errors, the next call would block, the buffer is full, or the stream is closed. Something like this is helpful to prevent excessive read callback execution under a flood of tiny payloads, presuming messages are not being combined as one would usually expect (see `//test/core/iomgr:endpoint_pair_test`, and Nagle's algorithm).
Built on #32560
When calling EventEngine::Read, if a synchronous WSARecv call completes successfully and 1) the read buffer is not full, and 2) the stream remains open, then the endpoint will now chain execution of more synchronous WSARecvs. The chain is broken and the on_read callback is called when either there are errors, the next call would block, the buffer is full, or the stream is closed.
Something like this is helpful to prevent excessive read callback execution under a flood of tiny payloads, presuming messages are not being combined as one would usually expect (see
//test/core/iomgr:endpoint_pair_test, and Nagle's algorithm).