Describe the enhancement: The disk queue necessarily uses in-memory queues for messages waiting to be written to disk, or to be written to the output after reading from disk. The amount of in-memory data can be capped, but (as with the memory queue) currently only by specifying a maximum number of events; there is no way to specify a maximum number of bytes, so the degree of control is dependent on having consistent / predictable event sizes. We should add parameters that can specify the maximum as a number of bytes instead, so it is easy to correctly tune a node's memory usage independent of its specific event flow.
Describe a specific use case for the enhancement or feature: This feature was not a priority in the initial release because the most urgent use case we understood at the time was data persistence. However, we are now seeing it deployed as an alternative to the memory queue specifically to work around the constraints of low-memory nodes, and these configuration options could greatly increase its effectiveness in that setting.
Technical sketch
There are two changes needed to support this feature, one for the intake queue (where events wait to be written to disk), and one for the output queue (where events that have been read from disk wait to be assigned to an output worker).
Intake queue
This change should be fairly simple: in libbeat/publisher/pipeline/queue/diskqueue/core_loop.go, the function diskQueue.canAcceptFrameOfSize() decides whether to accept a new event into the intake queue (diskQueue.pendingFrames). Currently its check is based only on the number of events already in the queue, but adding a size check would be straightforward (there are already helpers that calculate the size of the intake queue).
Output queue
This will require more significant changes, but is still feasible. The output queue is currently in the channel readerLoop.output in libbeat/publisher/pipeline/queue/diskqueue/reader_loop.go. Right now the simple event-count cap is implemented only by setting the buffer size of the channel.
To make it aware of size constraints:
- The core loop (
core_loop.go) must track how many bytes have been read from disk that have not yet been claimed by a consumer. This will likely be a byte counter in the main diskQueue structure. (The number of bytes read / allocated by the readerLoop as it reads from disk is already reported back in the readerLoopResponse, but currently it is only used to track queue position; handleReaderLoopResponse should also update the number of outstanding bytes in memory.)
- The event consumer (
consumer.go), which reads from readerLoop.output, must inform the core loop of how much data it has claimed (which can then be subtracted from the total outstanding).
- The
readerLoopRequest sent in core_loop.go:maybeReadPending must now calculate its endPosition based not only on how much total data is available, but on how much memory is free.
Subtlety: currently the reader loop can send events to the outputs while they are still being read, i.e. before it has sent the response to the core loop confirming how many bytes are used. Thus, "acknowledgements" from the consumer may come in before we have confirmation from the reader loop that the memory was occupied in the first place.
This is ok, however (as long as the books are balanced): when the core loop sends the reader loop request with the memory cap, that memory should already be considered "used", and thus we can safely "free" that memory quota when it is claimed by a consumer. The exact byte count in the reader loop response is only needed to detect when the real memory use is less than the amount reserved when sending the request.
Describe the enhancement: The disk queue necessarily uses in-memory queues for messages waiting to be written to disk, or to be written to the output after reading from disk. The amount of in-memory data can be capped, but (as with the memory queue) currently only by specifying a maximum number of events; there is no way to specify a maximum number of bytes, so the degree of control is dependent on having consistent / predictable event sizes. We should add parameters that can specify the maximum as a number of bytes instead, so it is easy to correctly tune a node's memory usage independent of its specific event flow.
Describe a specific use case for the enhancement or feature: This feature was not a priority in the initial release because the most urgent use case we understood at the time was data persistence. However, we are now seeing it deployed as an alternative to the memory queue specifically to work around the constraints of low-memory nodes, and these configuration options could greatly increase its effectiveness in that setting.
Technical sketch
There are two changes needed to support this feature, one for the intake queue (where events wait to be written to disk), and one for the output queue (where events that have been read from disk wait to be assigned to an output worker).
Intake queue
This change should be fairly simple: in
libbeat/publisher/pipeline/queue/diskqueue/core_loop.go, the functiondiskQueue.canAcceptFrameOfSize()decides whether to accept a new event into the intake queue (diskQueue.pendingFrames). Currently its check is based only on the number of events already in the queue, but adding a size check would be straightforward (there are already helpers that calculate the size of the intake queue).Output queue
This will require more significant changes, but is still feasible. The output queue is currently in the channel
readerLoop.outputinlibbeat/publisher/pipeline/queue/diskqueue/reader_loop.go. Right now the simple event-count cap is implemented only by setting the buffer size of the channel.To make it aware of size constraints:
core_loop.go) must track how many bytes have been read from disk that have not yet been claimed by a consumer. This will likely be a byte counter in the maindiskQueuestructure. (The number of bytes read / allocated by thereaderLoopas it reads from disk is already reported back in thereaderLoopResponse, but currently it is only used to track queue position;handleReaderLoopResponseshould also update the number of outstanding bytes in memory.)consumer.go), which reads fromreaderLoop.output, must inform the core loop of how much data it has claimed (which can then be subtracted from the total outstanding).readerLoopRequestsent incore_loop.go:maybeReadPendingmust now calculate itsendPositionbased not only on how much total data is available, but on how much memory is free.Subtlety: currently the reader loop can send events to the outputs while they are still being read, i.e. before it has sent the response to the core loop confirming how many bytes are used. Thus, "acknowledgements" from the consumer may come in before we have confirmation from the reader loop that the memory was occupied in the first place.
This is ok, however (as long as the books are balanced): when the core loop sends the reader loop request with the memory cap, that memory should already be considered "used", and thus we can safely "free" that memory quota when it is claimed by a consumer. The exact byte count in the reader loop response is only needed to detect when the real memory use is less than the amount reserved when sending the request.