Skip to content

Commit 30331bc

Browse files
authored
Add support for parsers in filestream input (#24763)
This PR adds support for pasers in the `filestream` input. Example configuration that aggregates fives lines into a single event and parses the JSON contents: ```yaml - type: filestream enabled: true paths: - test.log parsers: - multiline: type: count count_lines: 5 skip_newline: true - ndjson: fields_under_root: true ```
1 parent 7e1d9e4 commit 30331bc

10 files changed

Lines changed: 1200 additions & 26 deletions

File tree

filebeat/input/filestream/config.go

Lines changed: 6 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -70,7 +70,7 @@ type readerConfig struct {
7070
MaxBytes int `config:"message_max_bytes" validate:"min=0,nonzero"`
7171
Tail bool `config:"seek_to_tail"`
7272

73-
Parsers []*common.ConfigNamespace `config:"parsers"` // TODO multiline, json, syslog?
73+
Parsers []common.ConfigNamespace `config:"parsers"`
7474
}
7575

7676
type backoffConfig struct {
@@ -115,34 +115,18 @@ func defaultReaderConfig() readerConfig {
115115
LineTerminator: readfile.AutoLineTerminator,
116116
MaxBytes: 10 * humanize.MiByte,
117117
Tail: false,
118-
Parsers: nil,
118+
Parsers: make([]common.ConfigNamespace, 0),
119119
}
120120
}
121121

122122
func (c *config) Validate() error {
123123
if len(c.Paths) == 0 {
124124
return fmt.Errorf("no path is configured")
125125
}
126-
// TODO
127-
//if c.CleanInactive != 0 && c.IgnoreOlder == 0 {
128-
// return fmt.Errorf("ignore_older must be enabled when clean_inactive is used")
129-
//}
130-
131-
// TODO
132-
//if c.CleanInactive != 0 && c.CleanInactive <= c.IgnoreOlder+c.ScanFrequency {
133-
// return fmt.Errorf("clean_inactive must be > ignore_older + scan_frequency to make sure only files which are not monitored anymore are removed")
134-
//}
135-
136-
// TODO
137-
//if c.JSON != nil && len(c.JSON.MessageKey) == 0 &&
138-
// c.Multiline != nil {
139-
// return fmt.Errorf("When using the JSON decoder and multiline together, you need to specify a message_key value")
140-
//}
141-
142-
//if c.JSON != nil && len(c.JSON.MessageKey) == 0 &&
143-
// (len(c.IncludeLines) > 0 || len(c.ExcludeLines) > 0) {
144-
// return fmt.Errorf("When using the JSON decoder and line filtering together, you need to specify a message_key value")
145-
//}
126+
127+
if err := validateParserConfig(parserConfig{maxBytes: c.Reader.MaxBytes, lineTerminator: c.Reader.LineTerminator}, c.Reader.Parsers); err != nil {
128+
return fmt.Errorf("cannot parse parser configuration: %+v", err)
129+
}
146130

147131
return nil
148132
}

filebeat/input/filestream/environment_test.go

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -345,6 +345,39 @@ func (e *inputTestingEnvironment) getOutputMessages() []string {
345345
return messages
346346
}
347347

348+
func (e *inputTestingEnvironment) requireEventContents(nr int, key, value string) {
349+
events := make([]beat.Event, 0)
350+
for _, c := range e.pipeline.clients {
351+
for _, evt := range c.GetEvents() {
352+
events = append(events, evt)
353+
}
354+
}
355+
356+
selectedEvent := events[nr]
357+
v, err := selectedEvent.Fields.GetValue(key)
358+
if err != nil {
359+
e.t.Fatalf("cannot find key %s in event %+v", key, selectedEvent)
360+
}
361+
362+
val, ok := v.(string)
363+
if !ok {
364+
e.t.Fatalf("value is not string %+v", v)
365+
}
366+
require.Equal(e.t, value, val)
367+
}
368+
369+
func (e *inputTestingEnvironment) requireEventTimestamp(nr int, ts string) {
370+
events := make([]beat.Event, 0)
371+
for _, c := range e.pipeline.clients {
372+
for _, evt := range c.GetEvents() {
373+
events = append(events, evt)
374+
}
375+
}
376+
377+
selectedEvent := events[nr]
378+
require.Equal(e.t, ts, selectedEvent.Timestamp.String())
379+
}
380+
348381
type testInputStore struct {
349382
registry *statestore.Registry
350383
}
Lines changed: 89 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,89 @@
1+
// Licensed to Elasticsearch B.V. under one or more contributor
2+
// license agreements. See the NOTICE file distributed with
3+
// this work for additional information regarding copyright
4+
// ownership. Elasticsearch B.V. licenses this file to you under
5+
// the Apache License, Version 2.0 (the "License"); you may
6+
// not use this file except in compliance with the License.
7+
// You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
// +build integration
19+
20+
package filestream
21+
22+
const (
23+
elasticsearchMultilineLogs = `[2015-12-06 01:44:16,735][INFO ][node ] [Zach] version[2.0.0], pid[48553], build[de54438/2015-10-22T08:09:48Z]
24+
[2015-12-06 01:44:16,736][INFO ][node ] [Zach] initializing ...
25+
[2015-12-06 01:44:16,804][INFO ][plugins ] [Zach] loaded [], sites []
26+
[2015-12-06 01:44:16,941][INFO ][env ] [Zach] using [1] data paths, mounts [[/ (/dev/disk1)]], net usable_space [66.3gb], net total_space [232.6gb], spins? [unknown], types [hfs]
27+
[2015-12-06 01:44:19,177][INFO ][node ] [Zach] initialized
28+
[2015-12-06 01:44:19,177][INFO ][node ] [Zach] starting ...
29+
[2015-12-06 01:44:19,356][INFO ][transport ] [Zach] publish_address {127.0.0.1:9300}, bound_addresses {127.0.0.1:9300}, {[fe80::1]:9300}, {[::1]:9300}
30+
[2015-12-06 01:44:19,367][INFO ][discovery ] [Zach] elasticsearch/qfPw9z0HQe6grbJQruTCJQ
31+
[2015-12-06 01:44:22,405][INFO ][cluster.service ] [Zach] new_master {Zach}{qfPw9z0HQe6grbJQruTCJQ}{127.0.0.1}{127.0.0.1:9300}, reason: zen-disco-join(elected_as_master, [0] joins received)
32+
[2015-12-06 01:44:22,432][INFO ][http ] [Zach] publish_address {127.0.0.1:9200}, bound_addresses {127.0.0.1:9200}, {[fe80::1]:9200}, {[::1]:9200}
33+
[2015-12-06 01:44:22,432][INFO ][node ] [Zach] started
34+
[2015-12-06 01:44:22,446][INFO ][gateway ] [Zach] recovered [0] indices into cluster_state
35+
[2015-12-06 01:44:52,882][INFO ][cluster.metadata ] [Zach] [filebeat-2015.12.06] creating index, cause [auto(bulk api)], templates [], shards [5]/[1], mappings [process, system]
36+
[2015-12-06 01:44:53,256][INFO ][cluster.metadata ] [Zach] [filebeat-2015.12.06] update_mapping [process]
37+
[2015-12-06 01:44:53,269][DEBUG][action.admin.indices.mapping.put] [Zach] failed to put mappings on indices [[filebeat-2015.12.06]], type [process]
38+
MergeMappingException[Merge failed with failures {[mapper [proc.mem.rss_p] of different type, current_type [long], merged_type [double]]}]
39+
at org.elasticsearch.cluster.metadata.MetaDataMappingService$2.execute(MetaDataMappingService.java:388)
40+
at org.elasticsearch.cluster.service.InternalClusterService$UpdateTask.run(InternalClusterService.java:388)
41+
at org.elasticsearch.common.util.concurrent.PrioritizedEsThreadPoolExecutor$TieBreakingPrioritizedRunnable.runAndClean(PrioritizedEsThreadPoolExecutor.java:225)
42+
at org.elasticsearch.common.util.concurrent.PrioritizedEsThreadPoolExecutor$TieBreakingPrioritizedRunnable.run(PrioritizedEsThreadPoolExecutor.java:188)
43+
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
44+
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
45+
at java.lang.Thread.run(Thread.java:745)
46+
[2015-12-06 01:44:53,274][DEBUG][action.bulk ] [Zach] [filebeat-2015.12.06][0] failed to execute bulk item (index) index {[filebeat-2015.12.06][process][AVF0v5vcVA0hoJdODMTz], source[{"@timestamp":"2015-12-06T00:44:52.448Z","beat":{"hostname":"ruflin","name":"ruflin"},"count":1,"proc":{"cpu":{"user":1902,"user_p":0,"system":941,"total":2843,"start_time":"Dec03"},"mem":{"size":3616309248,"rss":156405760,"rss_p":0.01,"share":0},"name":"Google Chrome H","pid":40572,"ppid":392,"state":"running"},"type":"process"}]}
47+
MergeMappingException[Merge failed with failures {[mapper [proc.mem.rss_p] of different type, current_type [long], merged_type [double]]}]
48+
at org.elasticsearch.cluster.metadata.MetaDataMappingService$2.execute(MetaDataMappingService.java:388)
49+
at org.elasticsearch.cluster.service.InternalClusterService$UpdateTask.run(InternalClusterService.java:388)
50+
at org.elasticsearch.common.util.concurrent.PrioritizedEsThreadPoolExecutor$TieBreakingPrioritizedRunnable.runAndClean(PrioritizedEsThreadPoolExecutor.java:225)
51+
at org.elasticsearch.common.util.concurrent.PrioritizedEsThreadPoolExecutor$TieBreakingPrioritizedRunnable.run(PrioritizedEsThreadPoolExecutor.java:188)
52+
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
53+
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
54+
at java.lang.Thread.run(Thread.java:745)
55+
[2015-12-06 01:44:53,279][DEBUG][action.admin.indices.mapping.put] [Zach] failed to put mappings on indices [[filebeat-2015.12.06]], type [process]
56+
MergeMappingException[Merge failed with failures {[mapper [proc.mem.rss_p] of different type, current_type [long], merged_type [double], mapper [proc.cpu.user_p] of different type, current_type [long], merged_type [double]]}]
57+
at org.elasticsearch.cluster.metadata.MetaDataMappingService$2.execute(MetaDataMappingService.java:388)
58+
at org.elasticsearch.cluster.service.InternalClusterService$UpdateTask.run(InternalClusterService.java:388)
59+
at org.elasticsearch.common.util.concurrent.PrioritizedEsThreadPoolExecutor$TieBreakingPrioritizedRunnable.runAndClean(PrioritizedEsThreadPoolExecutor.java:225)
60+
at org.elasticsearch.common.util.concurrent.PrioritizedEsThreadPoolExecutor$TieBreakingPrioritizedRunnable.run(PrioritizedEsThreadPoolExecutor.java:188)
61+
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
62+
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
63+
at java.lang.Thread.run(Thread.java:745)
64+
[2015-12-06 01:44:53,280][DEBUG][action.bulk ] [Zach] [filebeat-2015.12.06][1] failed to execute bulk item (index) index {[filebeat-2015.12.06][process][AVF0v5vbVA0hoJdODMTj], source[{"@timestamp":"2015-12-06T00:44:52.416Z","beat":{"hostname":"ruflin","name":"ruflin"},"count":1,"proc":{"cpu":{"user":6643,"user_p":0.01,"system":693,"total":7336,"start_time":"01:44"},"mem":{"size":5182656512,"rss":248872960,"rss_p":0.01,"share":0},"name":"java","pid":48553,"ppid":48547,"state":"running"},"type":"process"}]}
65+
MergeMappingException[Merge failed with failures {[mapper [proc.mem.rss_p] of different type, current_type [long], merged_type [double], mapper [proc.cpu.user_p] of different type, current_type [long], merged_type [double]]}]
66+
at org.elasticsearch.cluster.metadata.MetaDataMappingService$2.execute(MetaDataMappingService.java:388)
67+
at org.elasticsearch.cluster.service.InternalClusterService$UpdateTask.run(InternalClusterService.java:388)
68+
at org.elasticsearch.common.util.concurrent.PrioritizedEsThreadPoolExecutor$TieBreakingPrioritizedRunnable.runAndClean(PrioritizedEsThreadPoolExecutor.java:225)
69+
at org.elasticsearch.common.util.concurrent.PrioritizedEsThreadPoolExecutor$TieBreakingPrioritizedRunnable.run(PrioritizedEsThreadPoolExecutor.java:188)
70+
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
71+
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
72+
at java.lang.Thread.run(Thread.java:745)
73+
[2015-12-06 01:44:53,334][INFO ][cluster.metadata ] [Zach] [filebeat-2015.12.06] update_mapping [system]
74+
[2015-12-06 01:44:53,646][INFO ][cluster.metadata ] [Zach] [filebeat-2015.12.06] create_mapping [filesystem]
75+
`
76+
77+
elasticsearchMultilineLongLogs = `[2015-12-06 01:44:16,735][INFO ][node ] [Zach] version[2.0.0], pid[48553], build[de54438/2015-10-22T08:09:48Z]
78+
[2015-12-06 01:44:53,269][DEBUG][action.admin.indices.mapping.put] [Zach] failed to put mappings on indices [[filebeat-2015.12.06]], type [process]
79+
MergeMappingException[Merge failed with failures {[mapper [proc.mem.rss_p] of different type, current_type [long], merged_type [double]]}]
80+
at org.elasticsearch.cluster.metadata.MetaDataMappingService$2.execute(MetaDataMappingService.java:388)
81+
at org.elasticsearch.cluster.service.InternalClusterService$UpdateTask.run(InternalClusterService.java:388)
82+
at org.elasticsearch.common.util.concurrent.PrioritizedEsThreadPoolExecutor$TieBreakingPrioritizedRunnable.runAndClean(PrioritizedEsThreadPoolExecutor.java:225)
83+
at org.elasticsearch.common.util.concurrent.PrioritizedEsThreadPoolExecutor$TieBreakingPrioritizedRunnable.run(PrioritizedEsThreadPoolExecutor.java:188)
84+
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
85+
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
86+
at java.lang.Thread.run(Thread.java:745)
87+
[2015-12-06 01:44:53,646][INFO ][cluster.metadata ] [Zach] [filebeat-2015.12.06] create_mapping [filesystem]
88+
`
89+
)

filebeat/input/filestream/input.go

Lines changed: 22 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,8 @@ type filestream struct {
5757
encodingFactory encoding.EncodingFactory
5858
encoding encoding.Encoding
5959
closerConfig closerConfig
60+
parserConfig []common.ConfigNamespace
61+
msgPostProc []postProcesser
6062
}
6163

6264
// Plugin creates a new filestream input plugin for creating a stateful input.
@@ -216,8 +218,16 @@ func (inp *filestream) open(log *logp.Logger, canceler input.Canceler, path stri
216218
}
217219

218220
r = readfile.NewStripNewline(r, inp.readerConfig.LineTerminator)
221+
222+
r, err = newParsers(r, parserConfig{maxBytes: inp.readerConfig.MaxBytes, lineTerminator: inp.readerConfig.LineTerminator}, inp.readerConfig.Parsers)
223+
if err != nil {
224+
return nil, err
225+
}
226+
219227
r = readfile.NewLimitReader(r, inp.readerConfig.MaxBytes)
220228

229+
inp.msgPostProc = newPostProcessors(inp.readerConfig.Parsers)
230+
221231
return r, nil
222232
}
223233

@@ -369,16 +379,24 @@ func (inp *filestream) eventFromMessage(m reader.Message, path string) beat.Even
369379
},
370380
}
371381
fields.DeepUpdate(m.Fields)
382+
m.Fields = fields
383+
384+
for _, proc := range inp.msgPostProc {
385+
proc.PostProcess(&m)
386+
}
372387

373388
if len(m.Content) > 0 {
374-
if fields == nil {
375-
fields = common.MapStr{}
389+
if m.Fields == nil {
390+
m.Fields = common.MapStr{}
391+
}
392+
if _, ok := m.Fields["message"]; !ok {
393+
m.Fields["message"] = string(m.Content)
376394
}
377-
fields["message"] = string(m.Content)
378395
}
379396

380397
return beat.Event{
381398
Timestamp: m.Ts,
382-
Fields: fields,
399+
Meta: m.Meta,
400+
Fields: m.Fields,
383401
}
384402
}
Lines changed: 140 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,140 @@
1+
// Licensed to Elasticsearch B.V. under one or more contributor
2+
// license agreements. See the NOTICE file distributed with
3+
// this work for additional information regarding copyright
4+
// ownership. Elasticsearch B.V. licenses this file to you under
5+
// the Apache License, Version 2.0 (the "License"); you may
6+
// not use this file except in compliance with the License.
7+
// You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
package filestream
19+
20+
import (
21+
"errors"
22+
"fmt"
23+
"io"
24+
25+
"github.com/elastic/beats/v7/libbeat/common"
26+
"github.com/elastic/beats/v7/libbeat/reader"
27+
"github.com/elastic/beats/v7/libbeat/reader/multiline"
28+
"github.com/elastic/beats/v7/libbeat/reader/readfile"
29+
"github.com/elastic/beats/v7/libbeat/reader/readjson"
30+
)
31+
32+
var (
33+
ErrNoSuchParser = errors.New("no such parser")
34+
)
35+
36+
// parser transforms or translates the Content attribute of a Message.
37+
// They are able to aggregate two or more Messages into a single one.
38+
type parser interface {
39+
io.Closer
40+
Next() (reader.Message, error)
41+
}
42+
43+
type parserConfig struct {
44+
maxBytes int
45+
lineTerminator readfile.LineTerminator
46+
}
47+
48+
type postProcesser interface {
49+
PostProcess(*reader.Message)
50+
Name() string
51+
}
52+
53+
func newParsers(in reader.Reader, pCfg parserConfig, c []common.ConfigNamespace) (parser, error) {
54+
p := in
55+
56+
parserCheck := make(map[string]int)
57+
for _, ns := range c {
58+
name := ns.Name()
59+
switch name {
60+
case "multiline":
61+
parserCheck["multiline"]++
62+
var config multiline.Config
63+
cfg := ns.Config()
64+
err := cfg.Unpack(&config)
65+
if err != nil {
66+
return nil, fmt.Errorf("error while parsing multiline parser config: %+v", err)
67+
}
68+
p, err = multiline.New(p, "\n", pCfg.maxBytes, &config)
69+
if err != nil {
70+
return nil, fmt.Errorf("error while creating multiline parser: %+v", err)
71+
}
72+
case "ndjson":
73+
parserCheck["ndjson"]++
74+
var config readjson.Config
75+
cfg := ns.Config()
76+
err := cfg.Unpack(&config)
77+
if err != nil {
78+
return nil, fmt.Errorf("error while parsing ndjson parser config: %+v", err)
79+
}
80+
p = readjson.NewJSONReader(p, &config)
81+
default:
82+
return nil, fmt.Errorf("%s: %s", ErrNoSuchParser, name)
83+
}
84+
}
85+
86+
// This is a temporary check. In the long run configuring multiple parsers with the same
87+
// type is going to be supported.
88+
if count, ok := parserCheck["multiline"]; ok && count > 1 {
89+
return nil, fmt.Errorf("only one parser is allowed for multiline, got %d", count)
90+
}
91+
if count, ok := parserCheck["ndjson"]; ok && count > 1 {
92+
return nil, fmt.Errorf("only one parser is allowed for ndjson, got %d", count)
93+
}
94+
95+
return p, nil
96+
}
97+
98+
func newPostProcessors(c []common.ConfigNamespace) []postProcesser {
99+
var pp []postProcesser
100+
for _, ns := range c {
101+
name := ns.Name()
102+
switch name {
103+
case "ndjson":
104+
var config readjson.Config
105+
cfg := ns.Config()
106+
cfg.Unpack(&config)
107+
pp = append(pp, readjson.NewJSONPostProcessor(&config))
108+
default:
109+
continue
110+
}
111+
}
112+
113+
return pp
114+
}
115+
116+
func validateParserConfig(pCfg parserConfig, c []common.ConfigNamespace) error {
117+
for _, ns := range c {
118+
name := ns.Name()
119+
switch name {
120+
case "multiline":
121+
var config multiline.Config
122+
cfg := ns.Config()
123+
err := cfg.Unpack(&config)
124+
if err != nil {
125+
return fmt.Errorf("error while parsing multiline parser config: %+v", err)
126+
}
127+
case "ndjson":
128+
var config readjson.Config
129+
cfg := ns.Config()
130+
err := cfg.Unpack(&config)
131+
if err != nil {
132+
return fmt.Errorf("error while parsing ndjson parser config: %+v", err)
133+
}
134+
default:
135+
return fmt.Errorf("%s: %s", ErrNoSuchParser, name)
136+
}
137+
}
138+
139+
return nil
140+
}

0 commit comments

Comments
 (0)