[libbeat] Track frame counts in saved segments in the disk queue#22970
[libbeat] Track frame counts in saved segments in the disk queue#22970faec merged 17 commits intoelastic:masterfrom
Conversation
|
Pinging @elastic/integrations (Team:Integrations) |
💚 Build Succeeded
Expand to view the summary
Build stats
Test stats 🧪
Trends 🧪💚 Flaky test reportTests succeeded. Expand to view the summary
Test stats 🧪
|
fearful-symmetry
left a comment
There was a problem hiding this comment.
LGTM, with the caveat that I don't know nearly enough about this as you.
|
Hi! We're labeling this issue as |
|
Hi! We're labeling this issue as |
|
Hi! |
|
Hi! We're labeling this issue as |
| // If the segment is still in the writing list, we can't discard it | ||
| // until the writer loop is done with it, but we can hope that advancing | ||
| // to the current write position will get us out of our error state. | ||
| dq.segments.nextReadPosition = segment.byteCount |
There was a problem hiding this comment.
Given the way readers and writers are coordinated, an error in a segment that is currently written would likely indicate a bug in the business logic, a bug in the framing done by the writer, or an unexpected race condition. The system will most likely recover once we did start a new segment file. In this case we might want to use logger.Criticalf, in order to encourage users to report bugs.
Errors on older, already closed segment files might indicate a broken/invalid segment file, or a bug in the framing. The former is to be expected if the system was not shutdown cleanly. Error level would be enough I think.
| fullPath := path.Join(pathStr, file.Name()) | ||
| header, err := readSegmentHeaderWithFrameCount(fullPath) | ||
| if header == nil { | ||
| logger.Errorf("couldn't load segment file '%v': %v", fullPath, err) |
There was a problem hiding this comment.
I'd like to encourage us to use structured logging more. e.g. the loop could introduce its own logger like logger := logger.With("segment", segmentID(id)). Would we need the full path if we have had the segment ID?
There was a problem hiding this comment.
The full path is theoretically redundant since every segment is in the same directory, and the directory depends on the user configuration, but I thought it would be nice for the error to be explicit since I wouldn't expect whoever sees the message to know where the queue path is or how to find it.
|
/test |
) (cherry picked from commit 4b14493)
What does this PR do?
This PR adds the plumbing to keep track of how many frames (events) are stored on disk at a given time, by adding a
frameCountfield to the segment file header and aframeIndexfield to thequeuePositionstructure stored in the queue state. The initial version of the queue only tracked byte counts and positions, which don't convert easily to frame counts.On startup, the queue now attempts to load the segment header from any preexisting segments. If the segment header has no frame count (either because it's from a previous version or because the segment was not closed cleanly), it attempts to calculate the value manually with a linear scan of the segment's frame headers.
Since the pipeline metrics are not yet accessible to the queue, this PR has no user-visible changes except for a few log messages.
Why is it important?
This is necessary preparation for reporting the "real" active event count for the disk queue as required by #22602
Checklist
I have made corresponding changes to the documentationI have made corresponding change to the default configuration filesI have added an entry inCHANGELOG.next.asciidocorCHANGELOG-developer.next.asciidoc.