Skip to content

Commit dba8f74

Browse files
vjsamuelCarlos Pérez-Aradros Herce
authored andcommitted
Refactor metadata generator to support adding metadata across resources (#14875)
* Refactor metagen to allow multiple resources to be enriched
1 parent 0fd2250 commit dba8f74

33 files changed

Lines changed: 2079 additions & 743 deletions

CHANGELOG.next.asciidoc

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d
2929
- Libbeat: Do not overwrite agent.*, ecs.version, and host.name. {pull}14407[14407]
3030
- Libbeat: Cleanup the x-pack licenser code to use the new license endpoint and the new format. {pull}15091[15091]
3131
- Users can now specify `monitoring.cloud.*` to override `monitoring.elasticsearch.*` settings. {issue}14399[14399] {pull}15254[15254]
32+
- Refactor metadata generator to support adding metadata across resources {pull}14875[14875]
3233
- Update to ECS 1.4.0. {pull}14844[14844]
3334

3435
*Auditbeat*

libbeat/autodiscover/providers/kubernetes/config.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,8 @@ import (
2323
"fmt"
2424
"time"
2525

26+
"github.com/elastic/beats/libbeat/common/kubernetes/metadata"
27+
2628
"github.com/elastic/beats/libbeat/autodiscover/template"
2729
"github.com/elastic/beats/libbeat/common"
2830
"github.com/elastic/beats/libbeat/common/cfgwarn"
@@ -48,6 +50,8 @@ type Config struct {
4850
Builders []*common.Config `config:"builders"`
4951
Appenders []*common.Config `config:"appenders"`
5052
Templates template.MapperSettings `config:"templates"`
53+
54+
AddResourceMetadata *metadata.AddResourceMetadataConfig `config:"add_resource_metadata"`
5155
}
5256

5357
func defaultConfig() *Config {

libbeat/autodiscover/providers/kubernetes/node.go

Lines changed: 6 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -29,30 +29,26 @@ import (
2929
"github.com/elastic/beats/libbeat/common"
3030
"github.com/elastic/beats/libbeat/common/bus"
3131
"github.com/elastic/beats/libbeat/common/kubernetes"
32+
"github.com/elastic/beats/libbeat/common/kubernetes/metadata"
3233
"github.com/elastic/beats/libbeat/common/safemapstr"
3334
"github.com/elastic/beats/libbeat/logp"
3435
)
3536

3637
type node struct {
3738
uuid uuid.UUID
3839
config *Config
39-
metagen kubernetes.MetaGenerator
40+
metagen metadata.MetaGen
4041
logger *logp.Logger
4142
publish func(bus.Event)
4243
watcher kubernetes.Watcher
4344
}
4445

4546
// NewNodeEventer creates an eventer that can discover and process node objects
4647
func NewNodeEventer(uuid uuid.UUID, cfg *common.Config, client k8s.Interface, publish func(event bus.Event)) (Eventer, error) {
47-
metagen, err := kubernetes.NewMetaGenerator(cfg)
48-
if err != nil {
49-
return nil, err
50-
}
51-
5248
logger := logp.NewLogger("autodiscover.node")
5349

5450
config := defaultConfig()
55-
err = cfg.Unpack(&config)
51+
err := cfg.Unpack(&config)
5652
if err != nil {
5753
return nil, err
5854
}
@@ -70,7 +66,7 @@ func NewNodeEventer(uuid uuid.UUID, cfg *common.Config, client k8s.Interface, pu
7066
watcher, err := kubernetes.NewWatcher(client, &kubernetes.Node{}, kubernetes.WatchOptions{
7167
SyncTimeout: config.SyncPeriod,
7268
Node: config.Node,
73-
})
69+
}, nil)
7470

7571
if err != nil {
7672
return nil, fmt.Errorf("couldn't create watcher for %T due to error %+v", &kubernetes.Node{}, err)
@@ -80,7 +76,7 @@ func NewNodeEventer(uuid uuid.UUID, cfg *common.Config, client k8s.Interface, pu
8076
config: config,
8177
uuid: uuid,
8278
publish: publish,
83-
metagen: metagen,
79+
metagen: metadata.NewNodeMetadataGenerator(cfg, watcher.Store()),
8480
logger: logger,
8581
watcher: watcher,
8682
}
@@ -172,11 +168,7 @@ func (n *node) emit(node *kubernetes.Node, flag string) {
172168
}
173169

174170
eventID := fmt.Sprint(node.GetObjectMeta().GetUID())
175-
meta := n.metagen.ResourceMetadata(node)
176-
177-
// TODO: Refactor metagen to make sure that this is seamless
178-
meta.Put("node.name", node.Name)
179-
meta.Put("node.uid", string(node.GetObjectMeta().GetUID()))
171+
meta := n.metagen.Generate(node)
180172

181173
kubemeta := meta.Clone()
182174
// Pass annotations to all events so that it can be used in templating and by annotation builders.

libbeat/autodiscover/providers/kubernetes/node_test.go

Lines changed: 12 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,8 @@ import (
2121
"testing"
2222
"time"
2323

24+
"github.com/elastic/beats/libbeat/common/kubernetes/metadata"
25+
2426
"github.com/gofrs/uuid"
2527
"github.com/stretchr/testify/assert"
2628
v1 "k8s.io/api/core/v1"
@@ -114,6 +116,11 @@ func TestEmitEvent_Node(t *testing.T) {
114116
nodeIP := "192.168.0.1"
115117
uid := "005f3b90-4b9d-12f8-acf0-31020a840133"
116118
UUID, err := uuid.NewV4()
119+
120+
typeMeta := metav1.TypeMeta{
121+
Kind: "Node",
122+
APIVersion: "v1",
123+
}
117124
if err != nil {
118125
t.Fatal(err)
119126
}
@@ -134,6 +141,7 @@ func TestEmitEvent_Node(t *testing.T) {
134141
Labels: map[string]string{},
135142
Annotations: map[string]string{},
136143
},
144+
TypeMeta: typeMeta,
137145
Status: v1.NodeStatus{
138146
Addresses: []v1.NodeAddress{
139147
{
@@ -180,7 +188,8 @@ func TestEmitEvent_Node(t *testing.T) {
180188
Labels: map[string]string{},
181189
Annotations: map[string]string{},
182190
},
183-
Status: v1.NodeStatus{},
191+
TypeMeta: typeMeta,
192+
Status: v1.NodeStatus{},
184193
},
185194
Expected: nil,
186195
},
@@ -194,6 +203,7 @@ func TestEmitEvent_Node(t *testing.T) {
194203
Labels: map[string]string{},
195204
Annotations: map[string]string{},
196205
},
206+
TypeMeta: typeMeta,
197207
Status: v1.NodeStatus{
198208
Addresses: []v1.NodeAddress{},
199209
Conditions: []v1.NodeCondition{
@@ -236,11 +246,7 @@ func TestEmitEvent_Node(t *testing.T) {
236246
t.Fatal(err)
237247
}
238248

239-
metaGen, err := kubernetes.NewMetaGenerator(common.NewConfig())
240-
if err != nil {
241-
t.Fatal(err)
242-
}
243-
249+
metaGen := metadata.NewNodeMetadataGenerator(common.NewConfig(), nil)
244250
p := &Provider{
245251
config: defaultConfig(),
246252
bus: bus.New("test"),

libbeat/autodiscover/providers/kubernetes/pod.go

Lines changed: 74 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -25,34 +25,31 @@ import (
2525
k8s "k8s.io/client-go/kubernetes"
2626

2727
"github.com/elastic/beats/libbeat/autodiscover/builder"
28-
2928
"github.com/elastic/beats/libbeat/common"
3029
"github.com/elastic/beats/libbeat/common/bus"
3130
"github.com/elastic/beats/libbeat/common/kubernetes"
31+
"github.com/elastic/beats/libbeat/common/kubernetes/metadata"
3232
"github.com/elastic/beats/libbeat/common/safemapstr"
3333
"github.com/elastic/beats/libbeat/logp"
3434
)
3535

3636
type pod struct {
37-
uuid uuid.UUID
38-
config *Config
39-
metagen kubernetes.MetaGenerator
40-
logger *logp.Logger
41-
publish func(bus.Event)
42-
watcher kubernetes.Watcher
37+
uuid uuid.UUID
38+
config *Config
39+
metagen metadata.MetaGen
40+
logger *logp.Logger
41+
publish func(bus.Event)
42+
watcher kubernetes.Watcher
43+
nodeWatcher kubernetes.Watcher
44+
namespaceWatcher kubernetes.Watcher
4345
}
4446

4547
// NewPodEventer creates an eventer that can discover and process pod objects
4648
func NewPodEventer(uuid uuid.UUID, cfg *common.Config, client k8s.Interface, publish func(event bus.Event)) (Eventer, error) {
47-
metagen, err := kubernetes.NewMetaGenerator(cfg)
48-
if err != nil {
49-
return nil, err
50-
}
51-
5249
logger := logp.NewLogger("autodiscover.pod")
5350

5451
config := defaultConfig()
55-
err = cfg.Unpack(&config)
52+
err := cfg.Unpack(&config)
5653
if err != nil {
5754
return nil, err
5855
}
@@ -71,18 +68,52 @@ func NewPodEventer(uuid uuid.UUID, cfg *common.Config, client k8s.Interface, pub
7168
SyncTimeout: config.SyncPeriod,
7269
Node: config.Node,
7370
Namespace: config.Namespace,
74-
})
71+
}, nil)
7572
if err != nil {
7673
return nil, fmt.Errorf("couldn't create watcher for %T due to error %+v", &kubernetes.Pod{}, err)
7774
}
7875

76+
var nodeMeta, namespaceMeta metadata.MetaGen
77+
var nodeWatcher, namespaceWatcher kubernetes.Watcher
78+
metaConf := config.AddResourceMetadata
79+
if metaConf != nil {
80+
if metaConf.Node != nil && metaConf.Node.Enabled() {
81+
options := kubernetes.WatchOptions{
82+
SyncTimeout: config.SyncPeriod,
83+
Node: config.Node,
84+
}
85+
if config.Namespace != "" {
86+
options.Namespace = config.Namespace
87+
}
88+
nodeWatcher, err = kubernetes.NewWatcher(client, &kubernetes.Node{}, options, nil)
89+
if err != nil {
90+
return nil, fmt.Errorf("couldn't create watcher for %T due to error %+v", &kubernetes.Node{}, err)
91+
}
92+
93+
nodeMeta = metadata.NewNodeMetadataGenerator(metaConf.Node, nodeWatcher.Store())
94+
}
95+
96+
if metaConf.Namespace != nil && metaConf.Namespace.Enabled() {
97+
namespaceWatcher, err = kubernetes.NewWatcher(client, &kubernetes.Namespace{}, kubernetes.WatchOptions{
98+
SyncTimeout: config.SyncPeriod,
99+
}, nil)
100+
if err != nil {
101+
return nil, fmt.Errorf("couldn't create watcher for %T due to error %+v", &kubernetes.Namespace{}, err)
102+
}
103+
104+
namespaceMeta = metadata.NewNamespaceMetadataGenerator(metaConf.Namespace, namespaceWatcher.Store())
105+
}
106+
}
107+
79108
p := &pod{
80-
config: config,
81-
uuid: uuid,
82-
publish: publish,
83-
metagen: metagen,
84-
logger: logger,
85-
watcher: watcher,
109+
config: config,
110+
uuid: uuid,
111+
publish: publish,
112+
metagen: metadata.NewPodMetadataGenerator(cfg, watcher.Store(), nodeMeta, namespaceMeta),
113+
logger: logger,
114+
watcher: watcher,
115+
nodeWatcher: nodeWatcher,
116+
namespaceWatcher: namespaceWatcher,
86117
}
87118

88119
watcher.AddEventHandler(p)
@@ -168,12 +199,33 @@ func (p *pod) GenerateHints(event bus.Event) bus.Event {
168199

169200
// Start starts the eventer
170201
func (p *pod) Start() error {
202+
if p.nodeWatcher != nil {
203+
err := p.nodeWatcher.Start()
204+
if err != nil {
205+
return err
206+
}
207+
}
208+
209+
if p.namespaceWatcher != nil {
210+
if err := p.namespaceWatcher.Start(); err != nil {
211+
return err
212+
}
213+
}
214+
171215
return p.watcher.Start()
172216
}
173217

174218
// Stop stops the eventer
175219
func (p *pod) Stop() {
176220
p.watcher.Stop()
221+
222+
if p.namespaceWatcher != nil {
223+
p.namespaceWatcher.Stop()
224+
}
225+
226+
if p.nodeWatcher != nil {
227+
p.nodeWatcher.Stop()
228+
}
177229
}
178230

179231
func (p *pod) emit(pod *kubernetes.Pod, flag string) {
@@ -231,7 +283,8 @@ func (p *pod) emitEvents(pod *kubernetes.Pod, flag string, containers []kubernet
231283
"image": c.Image,
232284
"runtime": runtimes[c.Name],
233285
}
234-
meta := p.metagen.ContainerMetadata(pod, c.Name, c.Image)
286+
meta := p.metagen.Generate(pod, metadata.WithFields("container.name", c.Name),
287+
metadata.WithFields("container.image", c.Image))
235288

236289
// Information that can be used in discovering a workload
237290
kubemeta := meta.Clone()

libbeat/autodiscover/providers/kubernetes/pod_test.go

Lines changed: 13 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,8 @@ import (
2121
"testing"
2222
"time"
2323

24+
"github.com/elastic/beats/libbeat/common/kubernetes/metadata"
25+
2426
"github.com/gofrs/uuid"
2527
"github.com/stretchr/testify/assert"
2628
v1 "k8s.io/api/core/v1"
@@ -175,6 +177,11 @@ func TestEmitEvent(t *testing.T) {
175177
t.Fatal(err)
176178
}
177179

180+
typeMeta := metav1.TypeMeta{
181+
Kind: "Pod",
182+
APIVersion: "v1",
183+
}
184+
178185
tests := []struct {
179186
Message string
180187
Flag string
@@ -192,6 +199,7 @@ func TestEmitEvent(t *testing.T) {
192199
Labels: map[string]string{},
193200
Annotations: map[string]string{},
194201
},
202+
TypeMeta: typeMeta,
195203
Status: v1.PodStatus{
196204
PodIP: podIP,
197205
ContainerStatuses: []kubernetes.PodContainerStatus{
@@ -264,6 +272,7 @@ func TestEmitEvent(t *testing.T) {
264272
Labels: map[string]string{},
265273
Annotations: map[string]string{},
266274
},
275+
TypeMeta: typeMeta,
267276
Status: v1.PodStatus{
268277
ContainerStatuses: []kubernetes.PodContainerStatus{
269278
{
@@ -295,6 +304,7 @@ func TestEmitEvent(t *testing.T) {
295304
Labels: map[string]string{},
296305
Annotations: map[string]string{},
297306
},
307+
TypeMeta: typeMeta,
298308
Status: v1.PodStatus{
299309
PodIP: podIP,
300310
ContainerStatuses: []kubernetes.PodContainerStatus{
@@ -326,6 +336,7 @@ func TestEmitEvent(t *testing.T) {
326336
Labels: map[string]string{},
327337
Annotations: map[string]string{},
328338
},
339+
TypeMeta: typeMeta,
329340
Status: v1.PodStatus{
330341
ContainerStatuses: []kubernetes.PodContainerStatus{
331342
{
@@ -393,6 +404,7 @@ func TestEmitEvent(t *testing.T) {
393404
Labels: map[string]string{},
394405
Annotations: map[string]string{},
395406
},
407+
TypeMeta: typeMeta,
396408
Status: v1.PodStatus{
397409
PodIP: podIP,
398410
ContainerStatuses: []kubernetes.PodContainerStatus{
@@ -459,11 +471,7 @@ func TestEmitEvent(t *testing.T) {
459471
t.Fatal(err)
460472
}
461473

462-
metaGen, err := kubernetes.NewMetaGenerator(common.NewConfig())
463-
if err != nil {
464-
t.Fatal(err)
465-
}
466-
474+
metaGen := metadata.NewPodMetadataGenerator(common.NewConfig(), nil, nil, nil)
467475
p := &Provider{
468476
config: defaultConfig(),
469477
bus: bus.New("test"),

0 commit comments

Comments
 (0)