[shipper] Make the memory queue accept opaque pointers#31356
[shipper] Make the memory queue accept opaque pointers#31356faec merged 46 commits intoelastic:mainfrom
Conversation
…ut the contained buffer type
|
Pinging @elastic/elastic-agent-data-plane (Team:Elastic-Agent-Data-Plane) |
| // there might also be free space before region A. In that | ||
| // case new events must be inserted in region B, but the | ||
| // queue isn't at capacity. | ||
| avail = len(b.entries) - b.regA.index - b.regA.size |
There was a problem hiding this comment.
What is the follow up from your comment about this possibly not being right? Is it too much work to fix, or not worth fixing?
There was a problem hiding this comment.
If I understand the intention correctly then it's an easy fix, just removing b.regA.index from the right side. I've been leaving it for last cause I don't want to intentionally change the functional logic until everything is at full parity with the old version (which right now is just pending on the stress tests).
There was a problem hiding this comment.
Hah -- this turned out to be the cause of the test failure 😅 This is the same computation as the old version, but before it was only made on a specific state transition, and now that its checked every loop iteration it ended up blocking the queue. Switching to the correct calculation here makes the tests pass locally, so fingers crossed on the CI now.
| broker := l.broker | ||
|
|
||
| for { | ||
| var pushChan chan pushRequest |
There was a problem hiding this comment.
Is this duplicated entirely between here and newDirectEventLoop? It is hard for me to spot if there is some subtle difference between the two just scrolling up and down.
There was a problem hiding this comment.
It's not quite duplicated -- it's the same logical sequence, but because the containing struct is different the conditions don't match (e.g.: here we check if the queue is full by comparing eventCount to maxEvents, but in the version above, directEventLoop has no field analogous to eventCount so it uses a different test).
I suspect that having these two almost-identical objects with such completely divergent implementations just for a special case doesn't help performance enough to justify the complexity, and if I get a chance I'd like to merge these into a single helper, but that seemed out of scope for now :-)
| ) | ||
|
|
||
| for { | ||
| var pushChan chan pushRequest |
There was a problem hiding this comment.
This is significantly more obvious than what was going on before. Nice!
|
Just looking at the diff I can't spot any major issues. I'll try to check this out and build more of an understanding of what this is doing later (after your refactoring, which is much easier to follow). Also I had never seen the |
Yea, I've never triggered that one before either, but it's related to sending loads through the pipeline so it's almost certainly a real failure. Right now I'm debugging it expecting that I missed a race condition somewhere. |
|
LGTM, give the rest of the time some time to look at it before merging though |
kvch
left a comment
There was a problem hiding this comment.
Thank you, awesome as always!
What does this PR do?
Refactors the memory queue internal data structures to accept opaque pointers (
interface{}) for its events rather than an explicitpublisher.Event. This is needed for the queue to store the event representations anticipated in https://github.com/elastic/elastic-agent-shipper.This doesn't fully resolve #31307 because it doesn't yet expose a type-agnostic public interface. This PR is already pretty big and I don't want it to eat into ON week, so I'm deferring those questions until I can give them full attention.
This change should, in a perfect world, be a functional no-op: it changes internal handling but the exposed API is unchanged. The main changes are:
eventsandclientsintoqueueEntry. The memory queue previously stored events aspublisher.Event, and their metadata inclientState. These were stored in separate arrays with shared indices, propagated in various ways. The new code creates the typequeueEntryas its underlying buffer type, which contains the event (aninterface{}which in beats has underlying type*publisher.Event) and its metadata. This change had to be propagated through a number of internal helpers likememqueue.ringBufferackStateinmemqueue.batch. There were also some fields that were duplicates of others -- ineventloop.gothe event loops had pointers to their associated broker and their own unaltered copies of several of its fields. I removed these when I could.pushRequest,getRequestetc) which were selectively nulled-out on appropriate state changes (e.g. if the queue is full after apushRequestthen the push channel is set to nil to block additional requests). This got quite hard to follow during the changes, since the fields were mutated throughout the code and their semantics were undocumented. I moved the channels into local variables in the run loop, initializing them immediately before their use inselect. This keeps the logic in one place, and it's clearer now what specific circumstances can enable / disable each channel.Checklist
I have made corresponding changes to the documentationI have made corresponding change to the default configuration filesI have added an entry inCHANGELOG.next.asciidocorCHANGELOG-developer.next.asciidoc.Performance tests
I ran some extra benchmarks using
libbeat/publisher/pipeline/stress. The main configurations I tested were buffered vs direct event loop. The tests were 1-minute samples sending a continuous stream of events through the queue. They were run with:and similarly after switching to the PR branch. The top-level results were:
Event throughput (per minute)
Total allocations:
In use allocations:
As expected for the nature of the change, the total allocations are noticeably higher, since a lot of the complexity of the
publisher.Eventhandling was to avoid allocating temporary values. However, the throughput is fine. While in-use memory is up it is still reasonable (8MB to send 69 million events).I also tested these configurations using the
blockingoutput test (out=blockingin the test name) which adds amin_waitto the configuration. Remarkably, both old and new queues had exactly the same throughputs (tho on the order of 30K rather than 70M). The total allocations were up in the new version but in-use was slightly down.Overall these results look to me like we are paying slightly for this simplification, but nothing that seems worrying. I'm expecting to do more pipeline performance work soon and this cleanup gives a good baseline for tracking down our real bottlenecks.
Related issues