Skip to content

Commit 75ebd50

Browse files
Inject Beat specific default processors (#34149)
Inject Beat specific default processors (#34149)
1 parent 1924467 commit 75ebd50

6 files changed

Lines changed: 129 additions & 8 deletions

File tree

x-pack/auditbeat/cmd/root.go

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ import (
1414
"github.com/elastic/beats/v7/x-pack/libbeat/management"
1515
"github.com/elastic/elastic-agent-client/v7/pkg/client"
1616
"github.com/elastic/elastic-agent-client/v7/pkg/proto"
17+
"github.com/elastic/elastic-agent-libs/mapstr"
1718

1819
// Register Auditbeat x-pack modules.
1920
_ "github.com/elastic/beats/v7/x-pack/auditbeat/include"
@@ -29,7 +30,8 @@ var RootCmd *cmd.BeatsRootCmd
2930
// auditbeatCfg is a callback registered with central management to perform any needed config transformations
3031
// before agent configs are sent to a beat
3132
func auditbeatCfg(rawIn *proto.UnitExpectedConfig, agentInfo *client.AgentInfo) ([]*reload.ConfigWithMeta, error) {
32-
modules, err := management.CreateInputsFromStreams(rawIn, "logs", agentInfo)
33+
procs := defaultProcessors()
34+
modules, err := management.CreateInputsFromStreams(rawIn, "logs", agentInfo, procs...)
3335
if err != nil {
3436
return nil, fmt.Errorf("error creating input list from raw expected config: %w", err)
3537
}
@@ -57,3 +59,15 @@ func init() {
5759
settings.ElasticLicensed = true
5860
RootCmd = auditbeatcmd.Initialize(settings)
5961
}
62+
63+
func defaultProcessors() []mapstr.M {
64+
// processors:
65+
// - add_host_metadata: ~
66+
// - add_cloud_metadata: ~
67+
// - add_docker_metadata: ~
68+
return []mapstr.M{
69+
{"add_host_metadata": nil},
70+
{"add_cloud_metadata": nil},
71+
{"add_docker_metadata": nil},
72+
}
73+
}

x-pack/filebeat/cmd/agent.go

Lines changed: 22 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,10 +11,12 @@ import (
1111
"github.com/elastic/beats/v7/x-pack/libbeat/management"
1212
"github.com/elastic/elastic-agent-client/v7/pkg/client"
1313
"github.com/elastic/elastic-agent-client/v7/pkg/proto"
14+
"github.com/elastic/elastic-agent-libs/mapstr"
1415
)
1516

1617
func filebeatCfg(rawIn *proto.UnitExpectedConfig, agentInfo *client.AgentInfo) ([]*reload.ConfigWithMeta, error) {
17-
modules, err := management.CreateInputsFromStreams(rawIn, "logs", agentInfo)
18+
procs := defaultProcessors()
19+
modules, err := management.CreateInputsFromStreams(rawIn, "logs", agentInfo, procs...)
1820
if err != nil {
1921
return nil, fmt.Errorf("error creating input list from raw expected config: %w", err)
2022
}
@@ -35,3 +37,22 @@ func filebeatCfg(rawIn *proto.UnitExpectedConfig, agentInfo *client.AgentInfo) (
3537

3638
return configList, nil
3739
}
40+
41+
func defaultProcessors() []mapstr.M {
42+
// processors:
43+
// - add_host_metadata:
44+
// when.not.contains.tags: forwarded
45+
// - add_cloud_metadata: ~
46+
// - add_docker_metadata: ~
47+
// - add_kubernetes_metadata: ~
48+
return []mapstr.M{
49+
{
50+
"add_host_metadata": mapstr.M{
51+
"when.not.contains.tags": "forwarded",
52+
},
53+
},
54+
{"add_cloud_metadata": nil},
55+
{"add_docker_metadata": nil},
56+
{"add_kubernetes_metadata": nil},
57+
}
58+
}

x-pack/libbeat/management/generate.go

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -67,7 +67,7 @@ func (r *TransformRegister) Transform(cfg *proto.UnitExpectedConfig, agentInfo *
6767
// CreateInputsFromStreams breaks down the raw Expected config into an array of individual inputs/modules from the Streams values
6868
// that can later be formatted into the reloader's ConfigWithMetaData and sent to an indvidual beat/
6969
// This also performs the basic task of inserting module-level add_field processors into the inputs/modules.
70-
func CreateInputsFromStreams(raw *proto.UnitExpectedConfig, inputType string, agentInfo *client.AgentInfo) ([]map[string]interface{}, error) {
70+
func CreateInputsFromStreams(raw *proto.UnitExpectedConfig, inputType string, agentInfo *client.AgentInfo, defaultProcessors ...mapstr.M) ([]map[string]interface{}, error) {
7171
// should this be an error?
7272
if raw.GetStreams() == nil {
7373
return []map[string]interface{}{}, nil
@@ -94,7 +94,7 @@ func CreateInputsFromStreams(raw *proto.UnitExpectedConfig, inputType string, ag
9494
}
9595

9696
// 3. stream processors
97-
streamSource, err = injectStreamProcessors(raw, inputType, stream, streamSource)
97+
streamSource, err = injectStreamProcessors(raw, inputType, stream, streamSource, defaultProcessors)
9898
if err != nil {
9999
return nil, fmt.Errorf("Error injecting stream processors: %w", err)
100100
}
@@ -181,13 +181,20 @@ func injectIndexStream(dataStreamType string, expected *proto.UnitExpectedConfig
181181

182182
//injectStreamProcessors is an emulation of the InjectStreamProcessorRule AST code
183183
// this adds a variety of processors for metadata related to the dataset and input config.
184-
func injectStreamProcessors(expected *proto.UnitExpectedConfig, dataStreamType string, streamExpected *proto.Stream, stream map[string]interface{}) (map[string]interface{}, error) {
184+
func injectStreamProcessors(expected *proto.UnitExpectedConfig, dataStreamType string, streamExpected *proto.Stream, stream map[string]interface{}, defaultProcessors []mapstr.M) (map[string]interface{}, error) {
185185
//1. start by "repairing" config to add any missing fields
186186
// logic from datastreamTypeFromInputNode
187187
procInputType, procInputDataset, procInputNamespace := metadataFromDatastreamValues(dataStreamType, expected, streamExpected)
188188

189189
var processors = []interface{}{}
190190

191+
for _, p := range defaultProcessors {
192+
if len(p) == 0 {
193+
continue
194+
}
195+
processors = append(processors, p)
196+
}
197+
191198
// In V1, AST injects input_id at the input level and not the stream level,
192199
// for reasons I can't understand, as it just ends up shuffling it around
193200
// to individual metricsets anyway, at least on metricbeat

x-pack/metricbeat/cmd/agent.go

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,10 +12,12 @@ import (
1212
"github.com/elastic/beats/v7/x-pack/libbeat/management"
1313
"github.com/elastic/elastic-agent-client/v7/pkg/client"
1414
"github.com/elastic/elastic-agent-client/v7/pkg/proto"
15+
"github.com/elastic/elastic-agent-libs/mapstr"
1516
)
1617

1718
func metricbeatCfg(rawIn *proto.UnitExpectedConfig, agentInfo *client.AgentInfo) ([]*reload.ConfigWithMeta, error) {
18-
modules, err := management.CreateInputsFromStreams(rawIn, "metrics", agentInfo)
19+
procs := defaultProcessors()
20+
modules, err := management.CreateInputsFromStreams(rawIn, "metrics", agentInfo, procs...)
1921
if err != nil {
2022
return nil, fmt.Errorf("error creating input list from raw expected config: %w", err)
2123
}
@@ -36,3 +38,17 @@ func metricbeatCfg(rawIn *proto.UnitExpectedConfig, agentInfo *client.AgentInfo)
3638

3739
return configList, nil
3840
}
41+
42+
func defaultProcessors() []mapstr.M {
43+
// processors:
44+
// - add_host_metadata: ~
45+
// - add_cloud_metadata: ~
46+
// - add_docker_metadata: ~
47+
// - add_kubernetes_metadata: ~
48+
return []mapstr.M{
49+
{"add_host_metadata": nil},
50+
{"add_cloud_metadata": nil},
51+
{"add_docker_metadata": nil},
52+
{"add_kubernetes_metadata": nil},
53+
}
54+
}

x-pack/osquerybeat/cmd/root.go

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -81,7 +81,8 @@ func osquerybeatCfg(rawIn *proto.UnitExpectedConfig, agentInfo *client.AgentInfo
8181
}
8282
rawIn.Streams = streams
8383

84-
modules, err := management.CreateInputsFromStreams(rawIn, "osquery", agentInfo)
84+
procs := defaultProcessors()
85+
modules, err := management.CreateInputsFromStreams(rawIn, "osquery", agentInfo, procs...)
8586
if err != nil {
8687
return nil, fmt.Errorf("error creating input list from raw expected config: %w", err)
8788
}
@@ -96,3 +97,13 @@ func osquerybeatCfg(rawIn *proto.UnitExpectedConfig, agentInfo *client.AgentInfo
9697
}
9798
return configList, nil
9899
}
100+
101+
func defaultProcessors() []mapstr.M {
102+
// processors:
103+
// - add_host_metadata: ~
104+
// - add_cloud_metadata: ~
105+
return []mapstr.M{
106+
{"add_host_metadata": nil},
107+
{"add_cloud_metadata": nil},
108+
}
109+
}

x-pack/packetbeat/cmd/root.go

Lines changed: 53 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ import (
1414
"github.com/elastic/elastic-agent-client/v7/pkg/client"
1515
"github.com/elastic/elastic-agent-client/v7/pkg/proto"
1616
conf "github.com/elastic/elastic-agent-libs/config"
17+
"github.com/elastic/elastic-agent-libs/mapstr"
1718

1819
_ "github.com/elastic/beats/v7/x-pack/libbeat/include"
1920

@@ -31,7 +32,8 @@ var RootCmd *cmd.BeatsRootCmd
3132
// configuration generated from a raw Elastic Agent config
3233
func packetbeatCfg(rawIn *proto.UnitExpectedConfig, agentInfo *client.AgentInfo) ([]*reload.ConfigWithMeta, error) {
3334
//grab and properly format the input streams
34-
inputStreams, err := management.CreateInputsFromStreams(rawIn, "logs", agentInfo)
35+
procs := defaultProcessors()
36+
inputStreams, err := management.CreateInputsFromStreams(rawIn, "logs", agentInfo, procs...)
3537
if err != nil {
3638
return nil, fmt.Errorf("error generating new stream config: %w", err)
3739
}
@@ -57,3 +59,53 @@ func init() {
5759
settings.ElasticLicensed = true
5860
RootCmd = packetbeatCmd.Initialize(settings)
5961
}
62+
63+
func defaultProcessors() []mapstr.M {
64+
// processors:
65+
// - # Add forwarded to tags when processing data from a network tap or mirror.
66+
// if.contains.tags: forwarded
67+
// then:
68+
// - drop_fields:
69+
// fields: [host]
70+
// else:
71+
// - add_host_metadata: ~
72+
// - add_cloud_metadata: ~
73+
// - add_docker_metadata: ~
74+
// - detect_mime_type:
75+
// field: http.request.body.content
76+
// target: http.request.mime_type
77+
// - detect_mime_type:
78+
// field: http.response.body.content
79+
// target: http.response.mime_type
80+
return []mapstr.M{
81+
{
82+
"if.contains.tags": "forwarded",
83+
"then": []interface{}{
84+
mapstr.M{
85+
"drop_fields": mapstr.M{
86+
"fields": []interface{}{"host"},
87+
},
88+
},
89+
},
90+
"else": []interface{}{
91+
mapstr.M{
92+
"add_host_metadata": nil,
93+
},
94+
},
95+
},
96+
{"add_cloud_metadata": nil},
97+
{"add_docker_metadata": nil},
98+
{
99+
"detect_mime_type": mapstr.M{
100+
"field": "http.request.body.content",
101+
"target": "http.request.mime_type",
102+
},
103+
},
104+
{
105+
"detect_mime_type": mapstr.M{
106+
"field": "http.response.body.content",
107+
"target": "http.response.mime_type",
108+
},
109+
},
110+
}
111+
}

0 commit comments

Comments
 (0)