Some inputs need a way to persist their position in a data stream, such as a log file or a Kafka topic. To guarantee at-least-once delivery of these events, it's not enough to just send a publish request to the shipper, since the shipper may shut down before successfully publishing all its queued events, and in this case the input's persisted position should not include events that were dropped during shutdown.
To support this, the shipper process maintains an internal ordering of queued events by an ascending ID. With each API call, the shipper reports the position of the highest sequential event ID that has been "persisted" -- either written to the configured output, or written to disk when the disk queue is in use (meaning that even if it is not published to an output during this run, it has been saved and will be published the next time the shipper starts). Once an event is reported as persisted, an input may safely update its internal position to reflect that those events have been processed.
Because the IDs assigned to events are specific to the shipper process, they are not preserved between restarts. To help inputs recognize and invalidate old IDs when the shipper restarts, the shipper process is assigned a UUID on startup, which is reported in API responses. While the shipper doesn't restart under normal operation, this gives inputs a way to provide additional robustness when the system is recovering from an error.
The approach of the current design is that inputs should not be responsible for detecting or handling errors during publication. The shipper reports only the minimum information needed for an input to maintain its position within the data source. Anything more granular than that belongs in the shipper itself, via appropriately configured error handling policies. We want the input itself to have minimal responsibility, so it is easy and practical to add new or custom inputs without complicated internal logic, and we want the shipper to have a robust enough internal error reporting mechanism that anything important can be surfaced there.
These goals led to several concrete decisions about acknowledgment handling:
- Lack of metadata: Early prototypes provided much more granular metadata -- publication details, errors, user-definable tags, etc., for every single event, optionally keyed by the input or datastream that generated it. However, tracking this much data with reasonable TTL policies was a heavy technical burden. Furthermore, there was no essential use case: the practical use of acknowledgments for the input API is almost exclusively to track the position within some persistent data sequence that should survive a restart (such as the filestream or kafka inputs).
- One event ID per publish request: Similar to the overall lack of metadata, the goal was to minimize the implementation burden on the input. While we could give the explicit queue IDs of all published events, allowing more granular tracking of the data stream position, actually tracking and exploiting that data would be inconvenient for a lightweight input, and would provide minimal benefit. By reducing to only the single highest ID per reply, we at worst might needlessly re-ingest part of a single Publish call after a bad shutdown. Since there is already no way to completely prevent this, we chose the simpler path.
- Keying the event ordering by the shipper process UUID: Ideally we would prefer not to include this field, especially since it is irrelevant under normal operation (the shipper UUID will only change when the shipper restarts, and the shipper only restarts when the process is crashed or nonresponsive). However, in the worst case, this could permanently stall an input like Kafka where confirmed checkpoints are essential to advancing the data stream. Agent itself doesn't communicate to inputs about shipper restarts, nor does it maintain process metadata that would serve this purpose. Thus, the UUID is a simple mechanism that merely detects when that sort of error recovery is necessary, and can be ignored by inputs for which it is not relevant.
- "Persisted" includes events which are dropped by processors, or which encountered fatal errors: Some events may not be ingestable by their target output, such that retrying indefinitely will never succeed and will instead stall the ingestion pipeline. Some events may encounter no errors, but may be dropped as a result of the processor configuration. When this happens, these events are still considered "persisted": they have reached a point where no further action is possible, and thus the input stream should advance rather than deadlocking. The input itself shouldn't need to know more than that -- any reporting and escalation of such errors should happen from the shipper. The
persisted_indexreported by the shipper doesn't represent guaranteed successful ingestion, merely a successful handoff to a reliable handler, so it should only be used to track input data positions and not to infer anything about the final state of any particular event.