Skip to content
This repository was archived by the owner on Sep 21, 2023. It is now read-only.
This repository was archived by the owner on Sep 21, 2023. It is now read-only.

Beats shipper output should not wait for full acknowledgment before sending more batches #98

@faec

Description

@faec

The current draft of the Beats shipper output waits for full transmission and acknowledgment of every batch of events before it sends another. In particular, this means that no matter how big the shipper queue is, the Beats shipper output will never use more than one batch worth of it, and the overall throughput is limited -- similar to if a Beats input waited on full end-to-end acknowledgment in between each Publish call.

Instead, we need to send batches as they come in and handle acknowledgment asynchronously, so events accumulate in the shipper queue as intended.

A sketch of how this might be implemented, from a thread on the original PR (elastic/beats#32708):

(1) add a list of pending batches to the shipper struct

type shipper struct {
	...
	pending []struct {
		batch publisher.Batch
		index uint64
	}
	pendingLock sync.Mutex
}

(2) modify shipper.Publish so it sends all events in the batch (making multiple upstream publish calls if necessary), and saves the batch as well as the final accepted index in shipper.pending. [This is more in keeping with the intent of the API anyway: currently the shipper only blocks until at least one event is sent, but that's just because the queue is full, not because some of the events really "failed." So marking these events as failed and needing retry in the beats output doesn't fit the usual semantics -- we don't want to be potentially dropping lots of events over "repeated failures" when all that's really happening is back pressure.]

(3) Add a continuous listener to PersistedIndex, and when a new value comes in do something like:

s.pendingLock.Lock()
batchCount := 0
while batchCount < len(s.pendingLock) && s.pending[batchCount].index < newPersistedIndex {
	s.pending[batchCount].ACK()
	batchCount++
}
s.pending = s.pending[batchCount:]
s.pendingLock.Unlock()

Metadata

Metadata

Assignees

No one assigned

    Type

    No type
    No fields configured for issues without a type.

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions