Skip to content

Commit 6896e77

Browse files
authored
Handle data returned with io.EOF in LineReader (#26260)
The libbeat LineReader implementation did not handle the case where the underlying io.Reader it was reading from returns bytes and io.EOF. It was discarding the data in this case. As per the io.Reader contract: a Reader returning a non-zero number of bytes at the end of the input stream may return either err == EOF or err == nil. This occurs often with the gzip.Reader. It returns a large chunk of data at the end of the file and io.EOF at the same time.
1 parent c25fca8 commit 6896e77

3 files changed

Lines changed: 45 additions & 5 deletions

File tree

CHANGELOG.next.asciidoc

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -282,6 +282,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d
282282
- Fix CredentialsJSON unpacking for `gcp-pubsub` and `httpjson` inputs. {pull}23277[23277]
283283
- Fix issue with m365_defender, when parsing incidents that has no alerts attached: {pull}25421[25421]
284284
- Fix default config template values for paths on oracle module: {pull}26276[26276]
285+
- Fix bug in aws-s3 input where the end of gzipped log files might have been discarded. {pull}26260[26260]
285286

286287
*Filebeat*
287288

libbeat/reader/readfile/line.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -138,6 +138,11 @@ func (r *LineReader) advance() error {
138138
// Try to read more bytes into buffer
139139
n, err := r.reader.Read(buf)
140140

141+
if err == io.EOF && n > 0 {
142+
// Continue processing the returned bytes. The next call will yield EOF with 0 bytes.
143+
err = nil
144+
}
145+
141146
// Appends buffer also in case of err
142147
r.inBuffer.Append(buf[:n])
143148
if err != nil {

libbeat/reader/readfile/line_test.go

Lines changed: 39 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -219,19 +219,25 @@ func testReadLineLengths(t *testing.T, lineLengths []int) {
219219
lines = append(lines, inputLine)
220220
}
221221

222-
testReadLines(t, lines)
222+
testReadLines(t, lines, false)
223223
}
224224

225-
func testReadLines(t *testing.T, inputLines [][]byte) {
225+
func testReadLines(t *testing.T, inputLines [][]byte, eofOnLastRead bool) {
226226
var inputStream []byte
227227
for _, line := range inputLines {
228228
inputStream = append(inputStream, line...)
229229
}
230230

231231
// initialize reader
232232
buffer := bytes.NewBuffer(inputStream)
233-
codec, _ := encoding.Plain(buffer)
234-
reader, err := NewLineReader(ioutil.NopCloser(buffer), Config{codec, buffer.Len(), LineFeed, unlimited})
233+
234+
var r io.Reader = buffer
235+
if eofOnLastRead {
236+
r = &eofWithNonZeroNumberOfBytesReader{buf: buffer}
237+
}
238+
239+
codec, _ := encoding.Plain(r)
240+
reader, err := NewLineReader(ioutil.NopCloser(r), Config{codec, buffer.Len(), LineFeed, unlimited})
235241
if err != nil {
236242
t.Fatalf("Error initializing reader: %v", err)
237243
}
@@ -255,7 +261,7 @@ func testReadLines(t *testing.T, inputLines [][]byte) {
255261
}
256262

257263
func testReadLine(t *testing.T, line []byte) {
258-
testReadLines(t, [][]byte{line})
264+
testReadLines(t, [][]byte{line}, false)
259265
}
260266

261267
func randomInt(r *rand.Rand, min, max int) int {
@@ -425,3 +431,31 @@ func TestBufferSize(t *testing.T) {
425431
require.Equal(t, string(b[:n]), lines[i])
426432
}
427433
}
434+
435+
// eofWithNonZeroNumberOfBytesReader is an io.Reader implementation that at the
436+
// end of the stream returns a non-zero number of bytes with io.EOF. This is
437+
// allowed under the io.Reader interface contract and must be handled by the
438+
// line reader.
439+
type eofWithNonZeroNumberOfBytesReader struct {
440+
buf *bytes.Buffer
441+
}
442+
443+
func (r *eofWithNonZeroNumberOfBytesReader) Read(d []byte) (int, error) {
444+
n, err := r.buf.Read(d)
445+
if err != nil {
446+
return n, err
447+
}
448+
449+
// As per the io.Reader contract:
450+
// "a Reader returning a non-zero number of bytes at the end of the input
451+
// stream may return either err == EOF or err == nil."
452+
if r.buf.Len() == 0 {
453+
return n, io.EOF
454+
}
455+
return n, nil
456+
}
457+
458+
// Verify handling of the io.Reader returning n > 0 with io.EOF.
459+
func TestReadWithNonZeroNumberOfBytesAndEOF(t *testing.T) {
460+
testReadLines(t, [][]byte{[]byte("Hello world!\n")}, true)
461+
}

0 commit comments

Comments
 (0)