Skip to content

Commit 655984e

Browse files
authored
Properly update offset in case of unparasable line (elastic#22685)
1 parent ade9a88 commit 655984e

8 files changed

Lines changed: 92 additions & 23 deletions

File tree

CHANGELOG.next.asciidoc

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -360,6 +360,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d
360360
- Fix handing missing eventtime and assignip field being set to N/A for fortinet module. {pull}22361[22361]
361361
- Fix Zeek dashboard reference to `zeek.ssl.server.name` field. {pull}21696[21696]
362362
- Fix for `field [source] not present as part of path [source.ip]` error in azure pipelines. {pull}22377[22377]
363+
- Properly update offset in case of unparasable line. {pull}22685[22685]
363364
- Drop aws.vpcflow.pkt_srcaddr and aws.vpcflow.pkt_dstaddr when equal to "-". {pull}22721[22721] {issue}22716[22716]
364365
- Fix cisco umbrella module config by adding input variable. {pull}22892[22892]
365366
- Fix network.direction logic in zeek connection fileset. {pull}22967[22967]

filebeat/input/filestream/input.go

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -298,9 +298,6 @@ func (inp *filestream) readFromSource(
298298
s.Offset = 0
299299
case ErrClosed:
300300
log.Info("Reader was closed. Closing.")
301-
case reader.ErrLineUnparsable:
302-
log.Info("Skipping unparsable line in file.")
303-
continue
304301
default:
305302
log.Errorf("Read line error: %v", err)
306303
}

filebeat/input/log/harvester.go

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -331,10 +331,6 @@ func (h *Harvester) Run() error {
331331
logp.Info("End of file reached: %s. Closing because close_eof is enabled.", h.state.Source)
332332
case ErrInactive:
333333
logp.Info("File is inactive: %s. Closing because close_inactive of %v reached.", h.state.Source, h.config.CloseInactive)
334-
case reader.ErrLineUnparsable:
335-
logp.Info("Skipping unparsable line in file: %v", h.state.Source)
336-
//line unparsable, go to next line
337-
continue
338334
default:
339335
logp.Err("Read line error: %v; File: %v", err, h.state.Source)
340336
}
Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
{"log":"Fetching main repository github.com/elastic/beats...\n","stream":"stdout","time":"2016-03-02T22:58:51.338462311Z"}
2+
{"log":"Fetching dependencies...\n","stream":"stdout","time":"2016-03-02T22:59:04.609292428Z"}
3+
{"log":"Execute /scripts/packetbeat_before_build.sh\n","stream":"stdout","time":"2016-03-02T22:59:04.617434682Z"}
4+
{"log":"patching file vendor/github.com/tsg/gopacket/pcap/pcap.go\n","stream":"stdout","time":"2016-03-02T22:59:04.626534779Z"}
5+
{"log":"cp etc/packetbeat.template.json /build/packetbeat.template.json\n","stream":"stdout","time":"2016-03-02T22:59:04.639782988Z"}
6+
{"log":"# linux\n","stream":"stdout","time":"2016-03-02T22:59:04.646276053Z"}
7+
"log":"cp packetbeat.yml /build/packetbeat-linux.yml\n","stream":"stdout","time":"2016-03-02T22:59:04.647847045Z"}
8+
{"log":"# binary\n","stream":"stdout","time":"2016-03-02T22:59:04.653740138Z"}
9+
{"log":"cp packetbeat.yml /build/packetbeat-binary.yml\n","stream":"stdout","time":"2016-03-02T22:59:04.655979016Z"}
10+
{"log":"# darwin\n","stream":"stdout","time":"2016-03-02T22:59:04.661181197Z"}
11+
{"log":"cp packetbeat.yml /build/packetbeat-darwin.yml\n","stream":"stdout","time":"2016-03-02T22:59:04.662859769Z"}
12+
{"log":"sed -i.bk 's/device: any/device: en0/' /build/packetbeat-darwin.yml\n","stream":"stdout","time":"2016-03-02T22:59:04.66649744Z"}
13+
{"log":"rm /build/packetbeat-darwin.yml.bk\n","stream":"stdout","time":"2016-03-02T22:59:04.701199002Z"}
14+
{"log":"# win\n","stream":"stdout","time":"2016-03-02T22:59:04.705067809Z"}
15+
{"log":"cp packetbeat.yml /build/packetbeat-win.yml\n","stream":"stdout","time":"2016-03-02T22:59:04.706629907Z"}
16+
{"log":"sed -i.bk 's/device: any/device: 0/' /build/packetbeat-win.yml\n","stream":"stdout","time":"2016-03-02T22:59:04.711993313Z"}
17+
{"log":"rm /build/packetbeat-win.yml.bk\n","stream":"stdout","time":"2016-03-02T22:59:04.757913979Z"}
18+
{"log":"Compiling for windows/amd64...\n","stream":"stdout","time":"2016-03-02T22:59:04.761895467Z"}
19+
{"log":"Compiling for windows/386...\n","stream":"stdout","time":"2016-03-02T22:59:29.481736885Z"}
20+
{"log":"Compiling for darwin/amd64...\n","stream":"stdout","time":"2016-03-02T22:59:55.205334574Z"}
21+
{"log":"Moving binaries to host...\n","stream":"stdout","time":"2016-03-02T23:00:15.140397826Z"}

filebeat/tests/system/test_container.py

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -66,3 +66,42 @@ def test_container_input_cri(self):
6666
output = self.read_output()
6767
assert len(output) == 1
6868
assert output[0]["stream"] == "stdout"
69+
70+
def test_container_input_registry_for_unparsable_lines(self):
71+
"""
72+
Test container input properly updates registry offset in case
73+
of unparsable lines
74+
"""
75+
input_raw = """
76+
- type: container
77+
paths:
78+
- {}/logs/*.log
79+
"""
80+
self.render_config_template(
81+
input_raw=input_raw.format(os.path.abspath(self.working_dir)),
82+
inputs=False,
83+
)
84+
85+
os.mkdir(self.working_dir + "/logs/")
86+
self.copy_files(["logs/docker_corrupted.log"],
87+
target_dir="logs")
88+
89+
filebeat = self.start_beat()
90+
91+
self.wait_until(lambda: self.output_has(lines=20))
92+
93+
filebeat.check_kill_and_wait()
94+
95+
output = self.read_output()
96+
assert len(output) == 20
97+
assert output[19]["message"] == "Moving binaries to host..."
98+
for o in output:
99+
assert o["stream"] == "stdout"
100+
101+
# Check that file exist
102+
data = self.get_registry()
103+
logs = self.log_access()
104+
assert logs.contains("Parse line error") == True
105+
# bytes of healthy file are 2244 so for the corrupted one should
106+
# be 2244-1=2243 since we removed one character
107+
assert data[0]["offset"] == 2243

libbeat/reader/reader.go

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,6 @@
1818
package reader
1919

2020
import (
21-
"errors"
2221
"io"
2322
)
2423

@@ -30,8 +29,3 @@ type Reader interface {
3029
io.Closer
3130
Next() (Message, error)
3231
}
33-
34-
var (
35-
//ErrLineUnparsable is error thrown when Next() element from input is corrupted and can not be parsed
36-
ErrLineUnparsable = errors.New("line is unparsable")
37-
)

libbeat/reader/readjson/docker_json.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -202,7 +202,7 @@ func (p *DockerJSONReader) Next() (reader.Message, error) {
202202
err = p.parseLine(&message, &logLine)
203203
if err != nil {
204204
p.logger.Errorf("Parse line error: %v", err)
205-
return message, reader.ErrLineUnparsable
205+
continue
206206
}
207207

208208
// Handle multiline messages, join partial lines
@@ -219,7 +219,7 @@ func (p *DockerJSONReader) Next() (reader.Message, error) {
219219
err = p.parseLine(&next, &logLine)
220220
if err != nil {
221221
p.logger.Errorf("Parse line error: %v", err)
222-
return message, reader.ErrLineUnparsable
222+
continue
223223
}
224224
message.Content = append(message.Content, next.Content...)
225225
}

libbeat/reader/readjson/docker_json_test.go

Lines changed: 29 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
package readjson
1919

2020
import (
21+
"io"
2122
"testing"
2223
"time"
2324

@@ -53,7 +54,7 @@ func TestDockerJSON(t *testing.T) {
5354
name: "Wrong JSON",
5455
input: [][]byte{[]byte(`this is not JSON`)},
5556
stream: "all",
56-
expectedError: reader.ErrLineUnparsable,
57+
expectedError: io.EOF,
5758
expectedMessage: reader.Message{
5859
Bytes: 16,
5960
},
@@ -73,7 +74,7 @@ func TestDockerJSON(t *testing.T) {
7374
name: "Wrong CRI",
7475
input: [][]byte{[]byte(`2017-09-12T22:32:21.212861448Z stdout`)},
7576
stream: "all",
76-
expectedError: reader.ErrLineUnparsable,
77+
expectedError: io.EOF,
7778
expectedMessage: reader.Message{
7879
Bytes: 37,
7980
},
@@ -82,7 +83,7 @@ func TestDockerJSON(t *testing.T) {
8283
name: "Wrong CRI",
8384
input: [][]byte{[]byte(`{this is not JSON nor CRI`)},
8485
stream: "all",
85-
expectedError: reader.ErrLineUnparsable,
86+
expectedError: io.EOF,
8687
expectedMessage: reader.Message{
8788
Bytes: 25,
8889
},
@@ -91,7 +92,7 @@ func TestDockerJSON(t *testing.T) {
9192
name: "Missing time",
9293
input: [][]byte{[]byte(`{"log":"1:M 09 Nov 13:27:36.276 # User requested shutdown...\n","stream":"stdout"}`)},
9394
stream: "all",
94-
expectedError: reader.ErrLineUnparsable,
95+
expectedError: io.EOF,
9596
expectedMessage: reader.Message{
9697
Bytes: 82,
9798
},
@@ -218,7 +219,7 @@ func TestDockerJSON(t *testing.T) {
218219
input: [][]byte{[]byte(`{"log":"1:M 09 Nov 13:27:36.276 # User requested shutdown...\n","stream":"stdout"}`)},
219220
stream: "all",
220221
format: "cri",
221-
expectedError: reader.ErrLineUnparsable,
222+
expectedError: io.EOF,
222223
expectedMessage: reader.Message{
223224
Bytes: 82,
224225
},
@@ -228,7 +229,7 @@ func TestDockerJSON(t *testing.T) {
228229
input: [][]byte{[]byte(`2017-09-12T22:32:21.212861448Z stdout 2017-09-12 22:32:21.212 [INFO][88] table.go 710: Invalidating dataplane cache`)},
229230
stream: "all",
230231
format: "docker",
231-
expectedError: reader.ErrLineUnparsable,
232+
expectedError: io.EOF,
232233
expectedMessage: reader.Message{
233234
Bytes: 115,
234235
},
@@ -300,7 +301,7 @@ func TestDockerJSON(t *testing.T) {
300301
[]byte(`{"log":"shutdown...\n","stream`),
301302
},
302303
stream: "stdout",
303-
expectedError: reader.ErrLineUnparsable,
304+
expectedError: io.EOF,
304305
expectedMessage: reader.Message{
305306
Bytes: 139,
306307
},
@@ -324,11 +325,25 @@ func TestDockerJSON(t *testing.T) {
324325
name: "Corrupted log message line",
325326
input: [][]byte{[]byte(`36.276 # User requested shutdown...\n","stream":"stdout","time":"2017-11-09T13:27:36.277747246Z"}`)},
326327
stream: "all",
327-
expectedError: reader.ErrLineUnparsable,
328+
expectedError: io.EOF,
328329
expectedMessage: reader.Message{
329330
Bytes: 97,
330331
},
331332
},
333+
{
334+
name: "Corrupted log message line is skipped, keep correct bytes count",
335+
input: [][]byte{
336+
[]byte(`36.276 # User requested shutdown...\n","stream":"stdout","time":"2017-11-09T13:27:36.277747246Z"}`),
337+
[]byte(`{"log":"1:M 09 Nov 13:27:36.276 # User requested","stream":"stdout","time":"2017-11-09T13:27:36.277747246Z"}`),
338+
},
339+
stream: "all",
340+
expectedMessage: reader.Message{
341+
Content: []byte("1:M 09 Nov 13:27:36.276 # User requested"),
342+
Fields: common.MapStr{"stream": "stdout"},
343+
Ts: time.Date(2017, 11, 9, 13, 27, 36, 277747246, time.UTC),
344+
Bytes: 205,
345+
},
346+
},
332347
}
333348

334349
for _, test := range tests {
@@ -358,6 +373,12 @@ type mockReader struct {
358373
}
359374

360375
func (m *mockReader) Next() (reader.Message, error) {
376+
if len(m.messages) < 1 {
377+
return reader.Message{
378+
Content: []byte{},
379+
Bytes: 0,
380+
}, io.EOF
381+
}
361382
message := m.messages[0]
362383
m.messages = m.messages[1:]
363384
return reader.Message{

0 commit comments

Comments
 (0)