Skip to content

Commit 2e7b902

Browse files
authored
[libbeat] Add more disk queue unit tests and fix a size-check bug (elastic#22107)
1 parent f33bfd9 commit 2e7b902

3 files changed

Lines changed: 594 additions & 58 deletions

File tree

libbeat/publisher/queue/diskqueue/core_loop.go

Lines changed: 17 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -93,10 +93,10 @@ func (dq *diskQueue) handleProducerWriteRequest(request producerWriteRequest) {
9393
// than an entire segment all by itself (as long as it isn't, it is
9494
// guaranteed to eventually enter the queue assuming no disk errors).
9595
frameSize := request.frame.sizeOnDisk()
96-
if dq.settings.MaxSegmentSize < frameSize {
96+
if dq.settings.maxSegmentOffset() < segmentOffset(frameSize) {
9797
dq.logger.Warnf(
98-
"Rejecting event with size %v because the maximum segment size is %v",
99-
frameSize, dq.settings.MaxSegmentSize)
98+
"Rejecting event with size %v because the segment buffer limit is %v",
99+
frameSize, dq.settings.maxSegmentOffset())
100100
request.responseChan <- false
101101
return
102102
}
@@ -326,13 +326,19 @@ func (dq *diskQueue) maybeWritePending() {
326326
// Nothing to do right now
327327
return
328328
}
329+
329330
// Remove everything from pendingFrames and forward it to the writer loop.
330331
frames := dq.pendingFrames
331332
dq.pendingFrames = nil
333+
dq.writerLoop.requestChan <- writerLoopRequest{frames: frames}
332334

333-
dq.writerLoop.requestChan <- writerLoopRequest{
334-
frames: frames,
335+
// Compute the size of the request so we know how full the queue is going
336+
// to be.
337+
totalSize := uint64(0)
338+
for _, sf := range frames {
339+
totalSize += sf.frame.sizeOnDisk()
335340
}
341+
dq.writeRequestSize = totalSize
336342
dq.writing = true
337343
}
338344

@@ -471,8 +477,12 @@ func (dq *diskQueue) canAcceptFrameOfSize(frameSize uint64) bool {
471477
// left in the queue after accounting for the existing segments and the
472478
// pending writes that were already accepted.
473479
pendingBytes := uint64(0)
474-
for _, request := range dq.pendingFrames {
475-
pendingBytes += request.frame.sizeOnDisk()
480+
for _, sf := range dq.pendingFrames {
481+
pendingBytes += sf.frame.sizeOnDisk()
482+
}
483+
// If a writing request is outstanding, include it in the size total.
484+
if dq.writing {
485+
pendingBytes += dq.writeRequestSize
476486
}
477487
currentSize := pendingBytes + dq.segments.sizeOnDisk()
478488

0 commit comments

Comments
 (0)