Skip to content

Commit 0e049f0

Browse files
authored
Limit the number of bytes read by LineReader in Filebeat (#19552)
1 parent 7007d97 commit 0e049f0

5 files changed

Lines changed: 244 additions & 22 deletions

File tree

CHANGELOG.next.asciidoc

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -217,6 +217,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d
217217
- Fix bug with empty filter values in system/service {pull}19812[19812]
218218
- Fix S3 input to trim delimiter /n from each log line. {pull}19972[19972]
219219
- Ignore missing in Zeek module when dropping unecessary fields. {pull}19984[19984]
220+
- Fix Filebeat OOMs on very long lines {issue}19500[19500], {pull}19552[19552]
220221

221222
*Heartbeat*
222223

filebeat/input/log/harvester.go

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -631,6 +631,8 @@ func (h *Harvester) newLogFileReader() (reader.Reader, error) {
631631
var r reader.Reader
632632
var err error
633633

634+
logp.Debug("harvester", "newLogFileReader with config.MaxBytes: %d", h.config.MaxBytes)
635+
634636
// TODO: NewLineReader uses additional buffering to deal with encoding and testing
635637
// for new lines in input stream. Simple 8-bit based encodings, or plain
636638
// don't require 'complicated' logic.
@@ -644,10 +646,17 @@ func (h *Harvester) newLogFileReader() (reader.Reader, error) {
644646
return nil, err
645647
}
646648

649+
// Configure MaxBytes limit for EncodeReader as multiplied by 4
650+
// for the worst case scenario where incoming UTF32 charchers are decoded to the single byte UTF-8 characters.
651+
// This limit serves primarily to avoid memory bload or potential OOM with expectedly long lines in the file.
652+
// The further size limiting is performed by LimitReader at the end of the readers pipeline as needed.
653+
encReaderMaxBytes := h.config.MaxBytes * 4
654+
647655
r, err = readfile.NewEncodeReader(reader, readfile.Config{
648656
Codec: h.encoding,
649657
BufferSize: h.config.BufferSize,
650658
Terminator: h.config.LineTerminator,
659+
MaxBytes: encReaderMaxBytes,
651660
})
652661
if err != nil {
653662
return nil, err

libbeat/reader/readfile/encode.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@ type Config struct {
3838
Codec encoding.Encoding
3939
BufferSize int
4040
Terminator LineTerminator
41+
MaxBytes int
4142
}
4243

4344
// New creates a new Encode reader from input reader by applying

libbeat/reader/readfile/line.go

Lines changed: 78 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -28,12 +28,15 @@ import (
2828
"github.com/elastic/beats/v7/libbeat/logp"
2929
)
3030

31+
const unlimited = 0
32+
3133
// lineReader reads lines from underlying reader, decoding the input stream
3234
// using the configured codec. The reader keeps track of bytes consumed
3335
// from raw input stream for every decoded line.
3436
type LineReader struct {
3537
reader io.Reader
3638
bufferSize int
39+
maxBytes int // max bytes per line limit to avoid OOM with malformatted files
3740
nl []byte
3841
decodedNl []byte
3942
inBuffer *streambuf.Buffer
@@ -62,6 +65,7 @@ func NewLineReader(input io.Reader, config Config) (*LineReader, error) {
6265
return &LineReader{
6366
reader: input,
6467
bufferSize: config.BufferSize,
68+
maxBytes: config.MaxBytes,
6569
decoder: config.Codec.NewDecoder(),
6670
nl: nl,
6771
decodedNl: terminator,
@@ -121,17 +125,17 @@ func (r *LineReader) advance() error {
121125
// Initial check if buffer has already a newLine character
122126
idx := r.inBuffer.IndexFrom(r.inOffset, r.nl)
123127

124-
// fill inBuffer until newline sequence has been found in input buffer
128+
// Fill inBuffer until newline sequence has been found in input buffer
125129
for idx == -1 {
126-
// increase search offset to reduce iterations on buffer when looping
130+
// Increase search offset to reduce iterations on buffer when looping
127131
newOffset := r.inBuffer.Len() - len(r.nl)
128132
if newOffset > r.inOffset {
129133
r.inOffset = newOffset
130134
}
131135

132136
buf := make([]byte, r.bufferSize)
133137

134-
// try to read more bytes into buffer
138+
// Try to read more bytes into buffer
135139
n, err := r.reader.Read(buf)
136140

137141
// Appends buffer also in case of err
@@ -140,16 +144,39 @@ func (r *LineReader) advance() error {
140144
return err
141145
}
142146

143-
// empty read => return buffer error (more bytes required error)
147+
// Empty read => return buffer error (more bytes required error)
144148
if n == 0 {
145149
return streambuf.ErrNoMoreBytes
146150
}
147151

148152
// Check if buffer has newLine character
149153
idx = r.inBuffer.IndexFrom(r.inOffset, r.nl)
154+
155+
// If max bytes limit per line is set, then drop the lines that are longer
156+
if r.maxBytes != 0 {
157+
// If newLine is found, drop the lines longer than maxBytes
158+
for idx != -1 && idx > r.maxBytes {
159+
r.logger.Warnf("Exceeded %d max bytes in line limit, skipped %d bytes line", r.maxBytes, idx)
160+
err = r.inBuffer.Advance(idx + len(r.nl))
161+
r.inBuffer.Reset()
162+
r.inOffset = 0
163+
idx = r.inBuffer.IndexFrom(r.inOffset, r.nl)
164+
}
165+
166+
// If newLine is not found and the incoming data buffer exceeded max bytes limit, then skip until the next newLine
167+
if idx == -1 && r.inBuffer.Len() > r.maxBytes {
168+
skipped, err := r.skipUntilNewLine(buf)
169+
if err != nil {
170+
r.logger.Error("Error skipping until new line, err:", err)
171+
return err
172+
}
173+
r.logger.Warnf("Exceeded %d max bytes in line limit, skipped %d bytes line", r.maxBytes, skipped)
174+
idx = r.inBuffer.IndexFrom(r.inOffset, r.nl)
175+
}
176+
}
150177
}
151178

152-
// found encoded byte sequence for newline in buffer
179+
// Found encoded byte sequence for newline in buffer
153180
// -> decode input sequence into outBuffer
154181
sz, err := r.decode(idx + len(r.nl))
155182
if err != nil {
@@ -158,20 +185,63 @@ func (r *LineReader) advance() error {
158185
sz = idx + len(r.nl)
159186
}
160187

161-
// consume transformed bytes from input buffer
188+
// Consume transformed bytes from input buffer
162189
err = r.inBuffer.Advance(sz)
163190
r.inBuffer.Reset()
164191

165-
// continue scanning input buffer from last position + 1
192+
// Continue scanning input buffer from last position + 1
166193
r.inOffset = idx + 1 - sz
167194
if r.inOffset < 0 {
168-
// fix inOffset if newline has encoding > 8bits + firl line has been decoded
195+
// Fix inOffset if newline has encoding > 8bits + firl line has been decoded
169196
r.inOffset = 0
170197
}
171198

172199
return err
173200
}
174201

202+
func (r *LineReader) skipUntilNewLine(buf []byte) (int, error) {
203+
// The length of the line skipped
204+
skipped := r.inBuffer.Len()
205+
206+
// Clean up the buffer
207+
err := r.inBuffer.Advance(skipped)
208+
r.inBuffer.Reset()
209+
210+
// Reset inOffset
211+
r.inOffset = 0
212+
213+
if err != nil {
214+
return 0, err
215+
}
216+
217+
// Read until the new line is found
218+
for idx := -1; idx == -1; {
219+
n, err := r.reader.Read(buf)
220+
221+
// Check bytes read for newLine
222+
if n > 0 {
223+
idx = bytes.Index(buf[:n], r.nl)
224+
225+
if idx != -1 {
226+
r.inBuffer.Append(buf[idx+len(r.nl) : n])
227+
skipped += idx
228+
} else {
229+
skipped += n
230+
}
231+
}
232+
233+
if err != nil {
234+
return skipped, err
235+
}
236+
237+
if n == 0 {
238+
return skipped, streambuf.ErrNoMoreBytes
239+
}
240+
}
241+
242+
return skipped, nil
243+
}
244+
175245
func (r *LineReader) decode(end int) (int, error) {
176246
var err error
177247
buffer := make([]byte, 1024)

0 commit comments

Comments
 (0)