The current draft of the Beats shipper output waits for full transmission and acknowledgment of every batch of events before it sends another. In particular, this means that no matter how big the shipper queue is, the Beats shipper output will never use more than one batch worth of it, and the overall throughput is limited -- similar to if a Beats input waited on full end-to-end acknowledgment in between each Publish call.
Instead, we need to send batches as they come in and handle acknowledgment asynchronously, so events accumulate in the shipper queue as intended.
A sketch of how this might be implemented, from a thread on the original PR (elastic/beats#32708):
(1) add a list of pending batches to the shipper struct
type shipper struct {
...
pending []struct {
batch publisher.Batch
index uint64
}
pendingLock sync.Mutex
}
(2) modify shipper.Publish so it sends all events in the batch (making multiple upstream publish calls if necessary), and saves the batch as well as the final accepted index in shipper.pending. [This is more in keeping with the intent of the API anyway: currently the shipper only blocks until at least one event is sent, but that's just because the queue is full, not because some of the events really "failed." So marking these events as failed and needing retry in the beats output doesn't fit the usual semantics -- we don't want to be potentially dropping lots of events over "repeated failures" when all that's really happening is back pressure.]
(3) Add a continuous listener to PersistedIndex, and when a new value comes in do something like:
s.pendingLock.Lock()
batchCount := 0
while batchCount < len(s.pendingLock) && s.pending[batchCount].index < newPersistedIndex {
s.pending[batchCount].ACK()
batchCount++
}
s.pending = s.pending[batchCount:]
s.pendingLock.Unlock()
The current draft of the Beats shipper output waits for full transmission and acknowledgment of every batch of events before it sends another. In particular, this means that no matter how big the shipper queue is, the Beats shipper output will never use more than one batch worth of it, and the overall throughput is limited -- similar to if a Beats input waited on full end-to-end acknowledgment in between each Publish call.
Instead, we need to send batches as they come in and handle acknowledgment asynchronously, so events accumulate in the shipper queue as intended.
A sketch of how this might be implemented, from a thread on the original PR (elastic/beats#32708):
(1) add a list of pending batches to the shipper struct
(2) modify
shipper.Publishso it sends all events in the batch (making multiple upstream publish calls if necessary), and saves the batch as well as the final accepted index inshipper.pending. [This is more in keeping with the intent of the API anyway: currently the shipper only blocks until at least one event is sent, but that's just because the queue is full, not because some of the events really "failed." So marking these events as failed and needing retry in the beats output doesn't fit the usual semantics -- we don't want to be potentially dropping lots of events over "repeated failures" when all that's really happening is back pressure.](3) Add a continuous listener to
PersistedIndex, and when a new value comes in do something like: