Add entry IDs to the memory queue#32541
Conversation
|
Pinging @elastic/elastic-agent-data-plane (Team:Elastic-Agent-Data-Plane) |
|
Added some more bits around metrics reporting, since I realized this code reported event IDs to the producers and consumers but the shipper also needs a standalone call to get the current ID, which I've added to the metrics struct already provided by the memory queue. |
|
The existing tests pass, the new tests for the event ID feature still need work:
|
|
|
||
| testForward := func(q queue.Queue) { | ||
| waiter := &producerACKWaiter{} | ||
| producer := q.Producer(queue.ProducerConfig{ACK: waiter.ack}) |
There was a problem hiding this comment.
Is it worth having a test case for concurrent producers?
|
LGTM, I'll wait for @rdner to confirm that his comments have been addressed before approving. |
|
Would also be good to get @leehinman to look at this, I've pinged him on Slack. |
leehinman
left a comment
There was a problem hiding this comment.
In a follow on PR could we get a doc with some high level design docs/diagrams.
| } | ||
|
|
||
| func (batch *diskQueueBatch) ID(i int) queue.EntryID { | ||
| return 0 |
There was a problem hiding this comment.
Am I correct in assuming a "real" event ids for the disk queue would be in a follow on PR? If so can we open an issue and make sure it is marked as "related" in this PR?
There was a problem hiding this comment.
Yes, the issue is elastic/elastic-agent-shipper#27
What does this PR do?
Adds an incrementing ID to events in the memory queue, as described in elastic/elastic-agent-shipper#27. This is needed by the shipper, to report to inputs whether their events have been sent yet.
Checklist
I have made corresponding changes to the documentationI have made corresponding change to the default configuration filesCHANGELOG.next.asciidocorCHANGELOG-developer.next.asciidoc.Related: elastic/elastic-agent-shipper#27