Skip to content

Commit 9f5ff30

Browse files
faecmergify-bot
authored andcommitted
[libbeat] Track frame counts in saved segments in the disk queue (#22970)
(cherry picked from commit 4b14493)
1 parent 0334abb commit 9f5ff30

11 files changed

Lines changed: 511 additions & 295 deletions

File tree

libbeat/publisher/queue/diskqueue/acks.go

Lines changed: 25 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -24,10 +24,25 @@ import (
2424
"github.com/elastic/beats/v7/libbeat/logp"
2525
)
2626

27-
// queuePosition represents a logical position within the queue buffer.
27+
// queuePosition represents the position of a data frame within the queue: the
28+
// containing segment, and a byte index into that segment on disk.
29+
// It also stores the 0-based index of the current frame within its segment
30+
// file. (Note that this depends only on the segment file itself, and is
31+
// unrelated to the frameID type used to identify frames in memory.)
32+
// The frame index is logically redundant with the byte index, but
33+
// calculating it requires a linear scan of the segment file, so we store
34+
// both values so we can track frame counts without reading the whole segment.
35+
// When referencing a data frame, a byteIndex of 0 / uninitialized is
36+
// understood to mean the first frame on disk (the header offset is
37+
// added during handling); thus, `queuePosition{segmentID: 5}` always points
38+
// to the first frame of segment 5, even though the logical position on
39+
// disk depends on the header size, which can vary across schema version/s.
40+
// However, a nonzero byteIndex is always interpreted as an exact
41+
// file position.
2842
type queuePosition struct {
29-
segmentID segmentID
30-
offset segmentOffset
43+
segmentID segmentID
44+
byteIndex uint64
45+
frameIndex uint64
3146
}
3247

3348
type diskQueueACKs struct {
@@ -114,15 +129,17 @@ func (dqa *diskQueueACKs) addFrames(frames []*readFrame) {
114129
newSegment, ok := dqa.segmentBoundaries[dqa.nextFrameID]
115130
if ok {
116131
// This is the start of a new segment. Remove this frame from the
117-
// segment boundary list and set the position to the start of the
118-
// new segment.
132+
// segment boundary list and reset the byte index to immediately
133+
// after the segment header.
119134
delete(dqa.segmentBoundaries, dqa.nextFrameID)
120135
dqa.nextPosition = queuePosition{
121-
segmentID: newSegment,
122-
offset: 0,
136+
segmentID: newSegment,
137+
byteIndex: segmentHeaderSize,
138+
frameIndex: 0,
123139
}
124140
}
125-
dqa.nextPosition.offset += segmentOffset(dqa.frameSize[dqa.nextFrameID])
141+
dqa.nextPosition.byteIndex += dqa.frameSize[dqa.nextFrameID]
142+
dqa.nextPosition.frameIndex++
126143
delete(dqa.frameSize, dqa.nextFrameID)
127144
}
128145
// We advanced the ACK position at least somewhat, so write its

libbeat/publisher/queue/diskqueue/config.go

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -89,7 +89,7 @@ func (c *userConfig) Validate() error {
8989
// be at least twice as large.
9090
if c.SegmentSize != nil && c.MaxSize != 0 && c.MaxSize < *c.SegmentSize*2 {
9191
return errors.New(
92-
"Disk queue max_size must be at least twice as big as segment_size")
92+
"disk queue max_size must be at least twice as big as segment_size")
9393
}
9494

9595
// We require a total queue size of at least 10MB, and a segment size of
@@ -100,17 +100,17 @@ func (c *userConfig) Validate() error {
100100
// restarts, it will work fine.
101101
if c.MaxSize != 0 && c.MaxSize < 10*1000*1000 {
102102
return fmt.Errorf(
103-
"Disk queue max_size (%d) cannot be less than 10MB", c.MaxSize)
103+
"disk queue max_size (%d) cannot be less than 10MB", c.MaxSize)
104104
}
105105
if c.SegmentSize != nil && *c.SegmentSize < 1000*1000 {
106106
return fmt.Errorf(
107-
"Disk queue segment_size (%d) cannot be less than 1MB", *c.SegmentSize)
107+
"disk queue segment_size (%d) cannot be less than 1MB", *c.SegmentSize)
108108
}
109109

110110
if c.RetryInterval != nil && c.MaxRetryInterval != nil &&
111111
*c.MaxRetryInterval < *c.RetryInterval {
112112
return fmt.Errorf(
113-
"Disk queue max_retry_interval (%v) can't be less than retry_interval (%v)",
113+
"disk queue max_retry_interval (%v) can't be less than retry_interval (%v)",
114114
*c.MaxRetryInterval, *c.RetryInterval)
115115
}
116116

@@ -189,8 +189,10 @@ func (settings Settings) segmentPath(segmentID segmentID) string {
189189
fmt.Sprintf("%v.seg", segmentID))
190190
}
191191

192-
func (settings Settings) maxSegmentOffset() segmentOffset {
193-
return segmentOffset(settings.MaxSegmentSize - segmentHeaderSize)
192+
// maxValidFrameSize returns the size of the largest possible frame that
193+
// can be stored with the current queue settings.
194+
func (settings Settings) maxValidFrameSize() uint64 {
195+
return settings.MaxSegmentSize - segmentHeaderSize
194196
}
195197

196198
// Given a retry interval, nextRetryInterval returns the next higher level

libbeat/publisher/queue/diskqueue/consumer.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -40,14 +40,14 @@ type diskQueueBatch struct {
4040

4141
func (consumer *diskQueueConsumer) Get(eventCount int) (queue.Batch, error) {
4242
if consumer.closed {
43-
return nil, fmt.Errorf("Tried to read from a closed disk queue consumer")
43+
return nil, fmt.Errorf("tried to read from a closed disk queue consumer")
4444
}
4545

4646
// Read at least one frame. This is guaranteed to eventually
4747
// succeed unless the queue is closed.
4848
frame, ok := <-consumer.queue.readerLoop.output
4949
if !ok {
50-
return nil, fmt.Errorf("Tried to read from a closed disk queue")
50+
return nil, fmt.Errorf("tried to read from a closed disk queue")
5151
}
5252
frames := []*readFrame{frame}
5353
eventLoop:

libbeat/publisher/queue/diskqueue/core_loop.go

Lines changed: 56 additions & 61 deletions
Original file line numberDiff line numberDiff line change
@@ -93,10 +93,10 @@ func (dq *diskQueue) handleProducerWriteRequest(request producerWriteRequest) {
9393
// than an entire segment all by itself (as long as it isn't, it is
9494
// guaranteed to eventually enter the queue assuming no disk errors).
9595
frameSize := request.frame.sizeOnDisk()
96-
if dq.settings.maxSegmentOffset() < segmentOffset(frameSize) {
96+
if frameSize > dq.settings.maxValidFrameSize() {
9797
dq.logger.Warnf(
9898
"Rejecting event with size %v because the segment buffer limit is %v",
99-
frameSize, dq.settings.maxSegmentOffset())
99+
frameSize, dq.settings.maxValidFrameSize())
100100
request.responseChan <- false
101101
return
102102
}
@@ -129,14 +129,15 @@ func (dq *diskQueue) handleWriterLoopResponse(response writerLoopResponse) {
129129
// The writer loop response contains the number of bytes written to
130130
// each segment that appeared in the request. Entries always appear in
131131
// the same sequence as (the beginning of) segments.writing.
132-
for index, bytesWritten := range response.bytesWritten {
132+
for index, segmentEntry := range response.segments {
133133
// Update the segment with its new size.
134-
dq.segments.writing[index].endOffset += segmentOffset(bytesWritten)
134+
dq.segments.writing[index].byteCount += segmentEntry.bytesWritten
135+
dq.segments.writing[index].frameCount += segmentEntry.framesWritten
135136
}
136137

137138
// If there is more than one segment in the response, then all but the
138139
// last have been closed and are ready to move to the reading list.
139-
closedCount := len(response.bytesWritten) - 1
140+
closedCount := len(response.segments) - 1
140141
if closedCount > 0 {
141142
// Remove the prefix of the writing array and append to to reading.
142143
closedSegments := dq.segments.writing[:closedCount]
@@ -151,37 +152,19 @@ func (dq *diskQueue) handleReaderLoopResponse(response readerLoopResponse) {
151152

152153
// Advance the frame / offset based on what was just completed.
153154
dq.segments.nextReadFrameID += frameID(response.frameCount)
154-
dq.segments.nextReadOffset += segmentOffset(response.byteCount)
155+
dq.segments.nextReadPosition += response.byteCount
155156

156-
var segment *queueSegment
157-
if len(dq.segments.reading) > 0 {
158-
// A segment is finished if we have read all the data, or
159-
// the read response reports an error.
160-
// Segments in the reading list have been completely written,
161-
// so we can rely on their endOffset field to determine their size.
162-
segment = dq.segments.reading[0]
163-
if dq.segments.nextReadOffset >= segment.endOffset || response.err != nil {
164-
dq.segments.reading = dq.segments.reading[1:]
165-
dq.segments.acking = append(dq.segments.acking, segment)
166-
dq.segments.nextReadOffset = 0
167-
}
168-
} else {
169-
// A segment in the writing list can't be finished writing,
170-
// so we don't check the endOffset.
171-
segment = dq.segments.writing[0]
172-
if response.err != nil {
173-
// Errors reading a writing segment are awkward since we can't discard
174-
// them until the writer loop is done with them. Instead we just seek
175-
// to the end of the current data region. If we're lucky this lets us
176-
// skip the intervening errors; if not, the segment will be cleaned up
177-
// after the writer loop is done with it.
178-
dq.segments.nextReadOffset = segment.endOffset
179-
}
180-
}
157+
segment := dq.segments.readingSegment()
181158
segment.framesRead += response.frameCount
182-
183-
// If there was an error, report it.
184159
if response.err != nil {
160+
// If there's an error, we advance to the end of the current segment.
161+
// If the segment is in the reading list, it will be removed on the
162+
// next call to maybeReadPending.
163+
// If the segment is still in the writing list, we can't discard it
164+
// until the writer loop is done with it, but we can hope that advancing
165+
// to the current write position will get us out of our error state.
166+
dq.segments.nextReadPosition = segment.byteCount
167+
185168
dq.logger.Errorf(
186169
"Error reading segment file %s: %v",
187170
dq.settings.segmentPath(segment.id), response.err)
@@ -197,7 +180,7 @@ func (dq *diskQueue) handleDeleterLoopResponse(response deleterLoopResponse) {
197180
// This segment had an error, so it stays in the acked list.
198181
newAckedSegments = append(newAckedSegments, dq.segments.acked[i])
199182
errors = append(errors,
200-
fmt.Errorf("Couldn't delete segment %d: %w",
183+
fmt.Errorf("couldn't delete segment %d: %w",
201184
dq.segments.acked[i].id, err))
202185
}
203186
}
@@ -209,7 +192,7 @@ func (dq *diskQueue) handleDeleterLoopResponse(response deleterLoopResponse) {
209192
}
210193
dq.segments.acked = newAckedSegments
211194
if len(errors) > 0 {
212-
dq.logger.Errorw("Deleting segment files", "errors", errors)
195+
dq.logger.Errorw("deleting segment files", "errors", errors)
213196
}
214197
}
215198

@@ -304,7 +287,7 @@ func (dq *diskQueue) handleShutdown() {
304287
// delete things before the current segment.
305288
if len(dq.segments.writing) > 0 &&
306289
finalPosition.segmentID == dq.segments.writing[0].id &&
307-
finalPosition.offset >= dq.segments.writing[0].endOffset {
290+
finalPosition.byteIndex >= dq.segments.writing[0].byteCount {
308291
dq.handleSegmentACK(finalPosition.segmentID)
309292
} else if finalPosition.segmentID > 0 {
310293
dq.handleSegmentACK(finalPosition.segmentID - 1)
@@ -353,43 +336,53 @@ func (segments *diskQueueSegments) readingSegment() *queueSegment {
353336
return nil
354337
}
355338

339+
// If the first entry of the reading list has been completely consumed,
340+
// move it to the acking list and update the read position.
341+
func (dq *diskQueue) maybeAdvanceReadingList() {
342+
if len(dq.segments.reading) > 0 {
343+
segment := dq.segments.reading[0]
344+
if dq.segments.nextReadPosition >= segment.byteCount {
345+
dq.segments.acking = append(dq.segments.acking, dq.segments.reading[0])
346+
dq.segments.reading = dq.segments.reading[1:]
347+
dq.segments.nextReadPosition = 0
348+
}
349+
}
350+
}
351+
356352
// If the reading list is nonempty, and there are no outstanding read
357353
// requests, send one.
358354
func (dq *diskQueue) maybeReadPending() {
359355
if dq.reading {
360356
// A read request is already pending
361357
return
362358
}
363-
// Check if the next reading segment has already been completely read. (This
364-
// can happen if it was being written and read simultaneously.) In this case
365-
// we should move it to the acking list and proceed to the next segment.
366-
if len(dq.segments.reading) > 0 &&
367-
dq.segments.nextReadOffset >= dq.segments.reading[0].endOffset {
368-
dq.segments.acking = append(dq.segments.acking, dq.segments.reading[0])
369-
dq.segments.reading = dq.segments.reading[1:]
370-
dq.segments.nextReadOffset = 0
371-
}
359+
// If the current segment has already been completely read, move to
360+
// the next one.
361+
dq.maybeAdvanceReadingList()
362+
372363
// Get the next available segment from the reading or writing lists.
373364
segment := dq.segments.readingSegment()
374365
if segment == nil ||
375-
dq.segments.nextReadOffset >= segmentOffset(segment.endOffset) {
366+
dq.segments.nextReadPosition >= segment.byteCount {
376367
// Nothing to read
377368
return
378369
}
379-
if dq.segments.nextReadOffset == 0 {
380-
// If we're reading the beginning of this segment, assign its firstFrameID
381-
// so we can recognize its acked frames later.
382-
// The first segment we read might not have its initial nextReadOffset
383-
// set to 0 if the segment was already partially read on a previous run.
384-
// However that can only happen when nextReadFrameID == 0, so we don't
385-
// need to do anything in that case.
370+
if dq.segments.nextReadPosition == 0 {
371+
// If we're reading this segment for the first time, assign its
372+
// firstFrameID so we can recognize its acked frames later, and advance
373+
// the reading position to the end of the segment header.
374+
// The first segment we read might not have the initial nextReadPosition
375+
// set to 0 if it was already partially read on a previous run.
376+
// However that can only happen when nextReadFrameID == 0, so in that
377+
// case firstFrameID is already initialized to the correct value.
386378
segment.firstFrameID = dq.segments.nextReadFrameID
379+
dq.segments.nextReadPosition = segment.headerSize()
387380
}
388381
request := readerLoopRequest{
389-
segment: segment,
390-
startFrameID: dq.segments.nextReadFrameID,
391-
startOffset: dq.segments.nextReadOffset,
392-
endOffset: segment.endOffset,
382+
segment: segment,
383+
startFrameID: dq.segments.nextReadFrameID,
384+
startPosition: dq.segments.nextReadPosition,
385+
endPosition: segment.byteCount,
393386
}
394387
dq.readerLoop.requestChan <- request
395388
dq.reading = true
@@ -433,18 +426,20 @@ func (dq *diskQueue) enqueueWriteFrame(frame *writeFrame) {
433426
if len(dq.segments.writing) > 0 {
434427
segment = dq.segments.writing[len(dq.segments.writing)-1]
435428
}
436-
frameLen := segmentOffset(frame.sizeOnDisk())
429+
newSegmentSize := dq.segments.writingSegmentSize + frame.sizeOnDisk()
437430
// If segment is nil, or the new segment exceeds its bounds,
438431
// we need to create a new writing segment.
439432
if segment == nil ||
440-
dq.segments.nextWriteOffset+frameLen > dq.settings.maxSegmentOffset() {
433+
newSegmentSize > dq.settings.MaxSegmentSize {
441434
segment = &queueSegment{id: dq.segments.nextID}
442435
dq.segments.writing = append(dq.segments.writing, segment)
443436
dq.segments.nextID++
444-
dq.segments.nextWriteOffset = 0
437+
// Reset the on-disk size to its initial value, the file's header size
438+
// with no frame data.
439+
newSegmentSize = segmentHeaderSize
445440
}
446441

447-
dq.segments.nextWriteOffset += frameLen
442+
dq.segments.writingSegmentSize = newSegmentSize
448443
dq.pendingFrames = append(dq.pendingFrames, segmentedFrame{
449444
frame: frame,
450445
segment: segment,

0 commit comments

Comments
 (0)