Skip to content

Commit 46d17b4

Browse files
authored
Add complete k8s metadata through composable provider (#27691)
1 parent 88b29b5 commit 46d17b4

10 files changed

Lines changed: 899 additions & 319 deletions

File tree

libbeat/cmd/instance/locker.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,7 @@ func newLocker(b *Beat) *locker {
4444
}
4545
}
4646

47-
// lock attemps to acquire a lock on the data path for the currently-running
47+
// lock attempts to acquire a lock on the data path for the currently-running
4848
// Beat instance. If another Beats instance already has a lock on the same data path
4949
// an ErrAlreadyLocked error is returned.
5050
func (l *locker) lock() error {

x-pack/elastic-agent/CHANGELOG.next.asciidoc

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -140,4 +140,5 @@
140140
- Add new --enroll-delay option for install and enroll commands. {pull}27118[27118]
141141
- Add link to troubleshooting guide on fatal exits. {issue}26367[26367] {pull}27236[27236]
142142
- Agent now adapts the beats queue size based on output settings. {issue}26638[26638] {pull}27429[27429]
143-
- Support ephemeral containers in Kubernetes dynamic provider. {issue}#27020[#27020] {pull}27707[27707]
143+
- Support ephemeral containers in Kubernetes dynamic provider. {issue}27020[#27020] {pull}27707[27707]
144+
- Add complete k8s metadata through composable provider. {pull}27691[27691]

x-pack/elastic-agent/pkg/composable/providers/kubernetes/config.go

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ package kubernetes
1010
import (
1111
"time"
1212

13+
"github.com/elastic/beats/v7/libbeat/common/kubernetes/metadata"
1314
"github.com/elastic/beats/v7/libbeat/logp"
1415
)
1516

@@ -25,6 +26,16 @@ type Config struct {
2526

2627
// Needed when resource is a Pod or Node
2728
Node string `config:"node"`
29+
30+
AddResourceMetadata *metadata.AddResourceMetadataConfig `config:"add_resource_metadata"`
31+
IncludeLabels []string `config:"include_labels"`
32+
ExcludeLabels []string `config:"exclude_labels"`
33+
34+
LabelsDedot bool `config:"labels.dedot"`
35+
AnnotationsDedot bool `config:"annotations.dedot"`
36+
37+
// Undocumented settings, to be deprecated in favor of `drop_fields` processor:
38+
IncludeCreatorMetadata bool `config:"include_creator_metadata"`
2839
}
2940

3041
// Resources config section for resources' config blocks
@@ -44,6 +55,9 @@ func (c *Config) InitDefaults() {
4455
c.CleanupTimeout = 60 * time.Second
4556
c.SyncPeriod = 10 * time.Minute
4657
c.Scope = "node"
58+
c.IncludeCreatorMetadata = true
59+
c.LabelsDedot = true
60+
c.AnnotationsDedot = true
4761
}
4862

4963
// Validate ensures correctness of config

x-pack/elastic-agent/pkg/composable/providers/kubernetes/kubernetes.go

Lines changed: 16 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -46,25 +46,26 @@ func DynamicProviderBuilder(logger *logger.Logger, c *config.Config) (composable
4646
if err != nil {
4747
return nil, errors.New(err, "failed to unpack configuration")
4848
}
49+
4950
return &dynamicProvider{logger, &cfg}, nil
5051
}
5152

5253
// Run runs the kubernetes context provider.
5354
func (p *dynamicProvider) Run(comm composable.DynamicProviderComm) error {
5455
if p.config.Resources.Pod.Enabled {
55-
err := p.watchResource(comm, "pod", p.config)
56+
err := p.watchResource(comm, "pod")
5657
if err != nil {
5758
return err
5859
}
5960
}
6061
if p.config.Resources.Node.Enabled {
61-
err := p.watchResource(comm, "node", p.config)
62+
err := p.watchResource(comm, "node")
6263
if err != nil {
6364
return err
6465
}
6566
}
6667
if p.config.Resources.Service.Enabled {
67-
err := p.watchResource(comm, "service", p.config)
68+
err := p.watchResource(comm, "service")
6869
if err != nil {
6970
return err
7071
}
@@ -76,9 +77,8 @@ func (p *dynamicProvider) Run(comm composable.DynamicProviderComm) error {
7677
// and starts watching for such resource's events.
7778
func (p *dynamicProvider) watchResource(
7879
comm composable.DynamicProviderComm,
79-
resourceType string,
80-
config *Config) error {
81-
client, err := kubernetes.GetKubernetesClient(config.KubeConfig)
80+
resourceType string) error {
81+
client, err := kubernetes.GetKubernetesClient(p.config.KubeConfig)
8282
if err != nil {
8383
// info only; return nil (do nothing)
8484
p.logger.Debugf("Kubernetes provider for resource %s skipped, unable to connect: %s", resourceType, err)
@@ -93,24 +93,24 @@ func (p *dynamicProvider) watchResource(
9393
p.logger.Debugf(
9494
"Initializing Kubernetes watcher for resource %s using node: %v",
9595
resourceType,
96-
config.Node)
96+
p.config.Node)
9797
nd := &kubernetes.DiscoverKubernetesNodeParams{
98-
ConfigHost: config.Node,
98+
ConfigHost: p.config.Node,
9999
Client: client,
100-
IsInCluster: kubernetes.IsInCluster(config.KubeConfig),
100+
IsInCluster: kubernetes.IsInCluster(p.config.KubeConfig),
101101
HostUtils: &kubernetes.DefaultDiscoveryUtils{},
102102
}
103-
config.Node, err = kubernetes.DiscoverKubernetesNode(p.logger, nd)
103+
p.config.Node, err = kubernetes.DiscoverKubernetesNode(p.logger, nd)
104104
if err != nil {
105105
p.logger.Debugf("Kubernetes provider skipped, unable to discover node: %w", err)
106106
return nil
107107
}
108108

109109
} else {
110-
config.Node = ""
110+
p.config.Node = ""
111111
}
112112

113-
watcher, err := p.newWatcher(resourceType, comm, client, config)
113+
watcher, err := p.newWatcher(resourceType, comm, client)
114114
if err != nil {
115115
return errors.New(err, "couldn't create kubernetes watcher for resource %s", resourceType)
116116
}
@@ -126,23 +126,22 @@ func (p *dynamicProvider) watchResource(
126126
func (p *dynamicProvider) newWatcher(
127127
resourceType string,
128128
comm composable.DynamicProviderComm,
129-
client k8s.Interface,
130-
config *Config) (kubernetes.Watcher, error) {
129+
client k8s.Interface) (kubernetes.Watcher, error) {
131130
switch resourceType {
132131
case "pod":
133-
watcher, err := NewPodWatcher(comm, config, p.logger, client, p.config.Scope)
132+
watcher, err := NewPodWatcher(comm, p.config, p.logger, client, p.config.Scope)
134133
if err != nil {
135134
return nil, err
136135
}
137136
return watcher, nil
138137
case "node":
139-
watcher, err := NewNodeWatcher(comm, config, p.logger, client, p.config.Scope)
138+
watcher, err := NewNodeWatcher(comm, p.config, p.logger, client, p.config.Scope)
140139
if err != nil {
141140
return nil, err
142141
}
143142
return watcher, nil
144143
case "service":
145-
watcher, err := NewServiceWatcher(comm, config, p.logger, client, p.config.Scope)
144+
watcher, err := NewServiceWatcher(comm, p.config, p.logger, client, p.config.Scope)
146145
if err != nil {
147146
return nil, err
148147
}

x-pack/elastic-agent/pkg/composable/providers/kubernetes/node.go

Lines changed: 37 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ import (
1313

1414
"github.com/elastic/beats/v7/libbeat/common"
1515
"github.com/elastic/beats/v7/libbeat/common/kubernetes"
16+
"github.com/elastic/beats/v7/libbeat/common/kubernetes/metadata"
1617
"github.com/elastic/beats/v7/libbeat/common/safemapstr"
1718
"github.com/elastic/beats/v7/libbeat/logp"
1819
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/errors"
@@ -25,6 +26,7 @@ type node struct {
2526
comm composable.DynamicProviderComm
2627
scope string
2728
config *Config
29+
metagen metadata.MetaGen
2830
}
2931

3032
type nodeData struct {
@@ -49,13 +51,25 @@ func NewNodeWatcher(
4951
if err != nil {
5052
return nil, errors.New(err, "couldn't create kubernetes watcher")
5153
}
52-
watcher.AddEventHandler(&node{logger, cfg.CleanupTimeout, comm, scope, cfg})
54+
55+
rawConfig, err := common.NewConfigFrom(cfg)
56+
if err != nil {
57+
return nil, errors.New(err, "failed to unpack configuration")
58+
}
59+
metaGen := metadata.NewNodeMetadataGenerator(rawConfig, watcher.Store(), client)
60+
watcher.AddEventHandler(&node{
61+
logger,
62+
cfg.CleanupTimeout,
63+
comm,
64+
scope,
65+
cfg,
66+
metaGen})
5367

5468
return watcher, nil
5569
}
5670

5771
func (n *node) emitRunning(node *kubernetes.Node) {
58-
data := generateNodeData(node, n.config)
72+
data := generateNodeData(node, n.config, n.metagen)
5973
if data == nil {
6074
return
6175
}
@@ -165,7 +179,7 @@ func isNodeReady(node *kubernetes.Node) bool {
165179
return false
166180
}
167181

168-
func generateNodeData(node *kubernetes.Node, cfg *Config) *nodeData {
182+
func generateNodeData(node *kubernetes.Node, cfg *Config, kubeMetaGen metadata.MetaGen) *nodeData {
169183
host := getAddress(node)
170184

171185
// If a node doesn't have an IP then dont monitor it
@@ -178,41 +192,40 @@ func generateNodeData(node *kubernetes.Node, cfg *Config) *nodeData {
178192
return nil
179193
}
180194

181-
//TODO: add metadata here too ie -> meta := n.metagen.Generate(node)
195+
meta := kubeMetaGen.Generate(node)
196+
kubemetaMap, err := meta.GetValue("kubernetes")
197+
if err != nil {
198+
return &nodeData{}
199+
}
182200

183201
// Pass annotations to all events so that it can be used in templating and by annotation builders.
184202
annotations := common.MapStr{}
185203
for k, v := range node.GetObjectMeta().GetAnnotations() {
186204
safemapstr.Put(annotations, k, v)
187205
}
188206

189-
labels := common.MapStr{}
190-
for k, v := range node.GetObjectMeta().GetLabels() {
191-
// TODO: add dedoting option
192-
safemapstr.Put(labels, k, v)
193-
}
207+
// k8sMapping includes only the metadata that fall under kubernetes.*
208+
// and these are available as dynamic vars through the provider
209+
k8sMapping := map[string]interface{}(kubemetaMap.(common.MapStr).Clone())
194210

195-
mapping := map[string]interface{}{
196-
"node": map[string]interface{}{
197-
"uid": string(node.GetUID()),
198-
"name": node.GetName(),
199-
"labels": labels,
200-
"annotations": annotations,
201-
"ip": host,
202-
},
203-
}
211+
// add annotations to be discoverable by templates
212+
k8sMapping["annotations"] = annotations
204213

205-
processors := []map[string]interface{}{
206-
{
214+
processors := []map[string]interface{}{}
215+
// meta map includes metadata that go under kubernetes.*
216+
// but also other ECS fields like orchestrator.*
217+
for field, metaMap := range meta {
218+
processor := map[string]interface{}{
207219
"add_fields": map[string]interface{}{
208-
"fields": mapping,
209-
"target": "kubernetes",
220+
"fields": metaMap,
221+
"target": field,
210222
},
211-
},
223+
}
224+
processors = append(processors, processor)
212225
}
213226
return &nodeData{
214227
node: node,
215-
mapping: mapping,
228+
mapping: k8sMapping,
216229
processors: processors,
217230
}
218231
}

x-pack/elastic-agent/pkg/composable/providers/kubernetes/node_test.go

Lines changed: 83 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,8 @@ package kubernetes
77
import (
88
"testing"
99

10+
"github.com/elastic/beats/v7/libbeat/common/kubernetes/metadata"
11+
1012
"github.com/elastic/beats/v7/libbeat/common"
1113

1214
"github.com/stretchr/testify/assert"
@@ -41,32 +43,96 @@ func TestGenerateNodeData(t *testing.T) {
4143
},
4244
}
4345

44-
data := generateNodeData(node, &Config{})
46+
data := generateNodeData(node, &Config{}, &nodeMeta{})
4547

4648
mapping := map[string]interface{}{
47-
"node": map[string]interface{}{
49+
"node": common.MapStr{
4850
"uid": string(node.GetUID()),
4951
"name": node.GetName(),
50-
"labels": common.MapStr{
51-
"foo": "bar",
52-
},
53-
"annotations": common.MapStr{
54-
"baz": "ban",
55-
},
56-
"ip": "node1",
52+
"ip": "node1",
53+
},
54+
"annotations": common.MapStr{
55+
"baz": "ban",
56+
},
57+
"labels": common.MapStr{
58+
"foo": "bar",
5759
},
5860
}
5961

60-
processors := []map[string]interface{}{
61-
{
62-
"add_fields": map[string]interface{}{
63-
"fields": mapping,
64-
"target": "kubernetes",
65-
},
62+
processors := map[string]interface{}{
63+
"orchestrator": common.MapStr{
64+
"cluster": common.MapStr{
65+
"name": "devcluster",
66+
"url": "8.8.8.8:9090"},
67+
}, "kubernetes": common.MapStr{
68+
"labels": common.MapStr{"foo": "bar"},
69+
"annotations": common.MapStr{"baz": "ban"},
70+
"node": common.MapStr{
71+
"ip": "node1",
72+
"name": "testnode",
73+
"uid": "005f3b90-4b9d-12f8-acf0-31020a840133"},
6674
},
6775
}
68-
6976
assert.Equal(t, node, data.node)
7077
assert.Equal(t, mapping, data.mapping)
71-
assert.Equal(t, processors, data.processors)
78+
for _, v := range data.processors {
79+
k := v["add_fields"].(map[string]interface{})
80+
target := k["target"].(string)
81+
fields := k["fields"]
82+
assert.Equal(t, processors[target], fields)
83+
}
84+
}
85+
86+
type nodeMeta struct{}
87+
88+
// Generate generates node metadata from a resource object
89+
// Metadata map is in the following form:
90+
// {
91+
// "kubernetes": {},
92+
// "some.ecs.field": "asdf"
93+
// }
94+
// All Kubernetes fields that need to be stored under kubernetes. prefix are populated by
95+
// GenerateK8s method while fields that are part of ECS are generated by GenerateECS method
96+
func (n *nodeMeta) Generate(obj kubernetes.Resource, opts ...metadata.FieldOptions) common.MapStr {
97+
ecsFields := n.GenerateECS(obj)
98+
meta := common.MapStr{
99+
"kubernetes": n.GenerateK8s(obj, opts...),
100+
}
101+
meta.DeepUpdate(ecsFields)
102+
return meta
103+
}
104+
105+
// GenerateECS generates node ECS metadata from a resource object
106+
func (n *nodeMeta) GenerateECS(obj kubernetes.Resource) common.MapStr {
107+
return common.MapStr{
108+
"orchestrator": common.MapStr{
109+
"cluster": common.MapStr{
110+
"name": "devcluster",
111+
"url": "8.8.8.8:9090",
112+
},
113+
},
114+
}
115+
}
116+
117+
// GenerateK8s generates node metadata from a resource object
118+
func (n *nodeMeta) GenerateK8s(obj kubernetes.Resource, opts ...metadata.FieldOptions) common.MapStr {
119+
k8sNode := obj.(*kubernetes.Node)
120+
return common.MapStr{
121+
"node": common.MapStr{
122+
"uid": string(k8sNode.GetUID()),
123+
"name": k8sNode.GetName(),
124+
"ip": "node1",
125+
},
126+
"labels": common.MapStr{
127+
"foo": "bar",
128+
},
129+
"annotations": common.MapStr{
130+
"baz": "ban",
131+
},
132+
}
133+
}
134+
135+
// GenerateFromName generates node metadata from a node name
136+
func (n *nodeMeta) GenerateFromName(name string, opts ...metadata.FieldOptions) common.MapStr {
137+
return nil
72138
}

0 commit comments

Comments
 (0)