Skip to content

Commit a690364

Browse files
authored
Port IncludeAnnotations setting to Agent and small manifest fix (#28247)
1 parent 17745e2 commit a690364

7 files changed

Lines changed: 111 additions & 32 deletions

File tree

deploy/kubernetes/elastic-agent-standalone-kubernetes.yaml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -441,7 +441,7 @@ data:
441441
# maxconn: 10
442442
# network: tcp
443443
# period: 10s
444-
# condition: ${kubernetes.pod.labels.app} == 'redis'
444+
# condition: ${kubernetes.labels.app} == 'redis'
445445
---
446446
apiVersion: apps/v1
447447
kind: DaemonSet

deploy/kubernetes/elastic-agent-standalone/elastic-agent-standalone-daemonset-configmap.yaml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -441,4 +441,4 @@ data:
441441
# maxconn: 10
442442
# network: tcp
443443
# period: 10s
444-
# condition: ${kubernetes.pod.labels.app} == 'redis'
444+
# condition: ${kubernetes.labels.app} == 'redis'

x-pack/elastic-agent/pkg/composable/providers/kubernetes/config.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ type Config struct {
3030
AddResourceMetadata *metadata.AddResourceMetadataConfig `config:"add_resource_metadata"`
3131
IncludeLabels []string `config:"include_labels"`
3232
ExcludeLabels []string `config:"exclude_labels"`
33+
IncludeAnnotations []string `config:"include_annotations"`
3334

3435
LabelsDedot bool `config:"labels.dedot"`
3536
AnnotationsDedot bool `config:"annotations.dedot"`

x-pack/elastic-agent/pkg/composable/providers/kubernetes/kubernetes.go

Lines changed: 20 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -110,42 +110,50 @@ func (p *dynamicProvider) watchResource(
110110
p.config.Node = ""
111111
}
112112

113-
watcher, err := p.newWatcher(resourceType, comm, client)
113+
eventer, err := p.newEventer(resourceType, comm, client)
114114
if err != nil {
115115
return errors.New(err, "couldn't create kubernetes watcher for resource %s", resourceType)
116116
}
117117

118-
err = watcher.Start()
118+
err = eventer.Start()
119119
if err != nil {
120-
return errors.New(err, "couldn't start kubernetes watcher for resource %s", resourceType)
120+
return errors.New(err, "couldn't start kubernetes eventer for resource %s", resourceType)
121121
}
122+
122123
return nil
123124
}
124125

125-
// newWatcher initializes the proper watcher according to the given resource (pod, node, service).
126-
func (p *dynamicProvider) newWatcher(
126+
// Eventer allows defining ways in which kubernetes resource events are observed and processed
127+
type Eventer interface {
128+
kubernetes.ResourceEventHandler
129+
Start() error
130+
Stop()
131+
}
132+
133+
// newEventer initializes the proper eventer according to the given resource (pod, node, service).
134+
func (p *dynamicProvider) newEventer(
127135
resourceType string,
128136
comm composable.DynamicProviderComm,
129-
client k8s.Interface) (kubernetes.Watcher, error) {
137+
client k8s.Interface) (Eventer, error) {
130138
switch resourceType {
131139
case "pod":
132-
watcher, err := NewPodWatcher(comm, p.config, p.logger, client, p.config.Scope)
140+
eventer, err := NewPodEventer(comm, p.config, p.logger, client, p.config.Scope)
133141
if err != nil {
134142
return nil, err
135143
}
136-
return watcher, nil
144+
return eventer, nil
137145
case "node":
138-
watcher, err := NewNodeWatcher(comm, p.config, p.logger, client, p.config.Scope)
146+
eventer, err := NewNodeEventer(comm, p.config, p.logger, client, p.config.Scope)
139147
if err != nil {
140148
return nil, err
141149
}
142-
return watcher, nil
150+
return eventer, nil
143151
case "service":
144-
watcher, err := NewServiceWatcher(comm, p.config, p.logger, client, p.config.Scope)
152+
eventer, err := NewServiceEventer(comm, p.config, p.logger, client, p.config.Scope)
145153
if err != nil {
146154
return nil, err
147155
}
148-
return watcher, nil
156+
return eventer, nil
149157
default:
150158
return nil, fmt.Errorf("unsupported autodiscover resource %s", resourceType)
151159
}

x-pack/elastic-agent/pkg/composable/providers/kubernetes/node.go

Lines changed: 19 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ type node struct {
2727
scope string
2828
config *Config
2929
metagen metadata.MetaGen
30+
watcher kubernetes.Watcher
3031
}
3132

3233
type nodeData struct {
@@ -35,13 +36,13 @@ type nodeData struct {
3536
processors []map[string]interface{}
3637
}
3738

38-
// NewNodeWatcher creates a watcher that can discover and process node objects
39-
func NewNodeWatcher(
39+
// NewNodeEventer creates an eventer that can discover and process node objects
40+
func NewNodeEventer(
4041
comm composable.DynamicProviderComm,
4142
cfg *Config,
4243
logger *logp.Logger,
4344
client k8s.Interface,
44-
scope string) (kubernetes.Watcher, error) {
45+
scope string) (Eventer, error) {
4546
watcher, err := kubernetes.NewWatcher(client, &kubernetes.Node{}, kubernetes.WatchOptions{
4647
SyncTimeout: cfg.SyncPeriod,
4748
Node: cfg.Node,
@@ -57,15 +58,17 @@ func NewNodeWatcher(
5758
return nil, errors.New(err, "failed to unpack configuration")
5859
}
5960
metaGen := metadata.NewNodeMetadataGenerator(rawConfig, watcher.Store(), client)
60-
watcher.AddEventHandler(&node{
61+
n := &node{
6162
logger,
6263
cfg.CleanupTimeout,
6364
comm,
6465
scope,
6566
cfg,
66-
metaGen})
67+
metaGen,
68+
watcher}
69+
watcher.AddEventHandler(n)
6770

68-
return watcher, nil
71+
return n, nil
6972
}
7073

7174
func (n *node) emitRunning(node *kubernetes.Node) {
@@ -83,6 +86,16 @@ func (n *node) emitStopped(node *kubernetes.Node) {
8386
n.comm.Remove(string(node.GetUID()))
8487
}
8588

89+
// Start starts the eventer
90+
func (n *node) Start() error {
91+
return n.watcher.Start()
92+
}
93+
94+
// Stop stops the eventer
95+
func (n *node) Stop() {
96+
n.watcher.Stop()
97+
}
98+
8699
// OnAdd ensures processing of node objects that are newly created
87100
func (n *node) OnAdd(obj interface{}) {
88101
n.logger.Debugf("Watcher Node add: %+v", obj)

x-pack/elastic-agent/pkg/composable/providers/kubernetes/pod.go

Lines changed: 41 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,8 @@ type pod struct {
2828
scope string
2929
config *Config
3030
metagen metadata.MetaGen
31+
watcher kubernetes.Watcher
32+
nodeWatcher kubernetes.Watcher
3133
namespaceWatcher kubernetes.Watcher
3234

3335
// Mutex used by configuration updates not triggered by the main watcher,
@@ -65,13 +67,13 @@ type namespacePodUpdater struct {
6567
locker sync.Locker
6668
}
6769

68-
// NewPodWatcher creates a watcher that can discover and process pod objects
69-
func NewPodWatcher(
70+
// NewPodEventer creates an eventer that can discover and process pod objects
71+
func NewPodEventer(
7072
comm composable.DynamicProviderComm,
7173
cfg *Config,
7274
logger *logp.Logger,
7375
client k8s.Interface,
74-
scope string) (kubernetes.Watcher, error) {
76+
scope string) (Eventer, error) {
7577
watcher, err := kubernetes.NewWatcher(client, &kubernetes.Pod{}, kubernetes.WatchOptions{
7678
SyncTimeout: cfg.SyncPeriod,
7779
Node: cfg.Node,
@@ -107,24 +109,57 @@ func NewPodWatcher(
107109
}
108110
metaGen := metadata.GetPodMetaGen(rawConfig, watcher, nodeWatcher, namespaceWatcher, metaConf)
109111

110-
p := pod{
112+
p := &pod{
111113
logger: logger,
112114
cleanupTimeout: cfg.CleanupTimeout,
113115
comm: comm,
114116
scope: scope,
115117
config: cfg,
116118
metagen: metaGen,
119+
watcher: watcher,
120+
nodeWatcher: nodeWatcher,
117121
namespaceWatcher: namespaceWatcher,
118122
}
119123

120-
watcher.AddEventHandler(&p)
124+
watcher.AddEventHandler(p)
121125

122126
if namespaceWatcher != nil && metaConf.Namespace.Enabled() {
123127
updater := newNamespacePodUpdater(p.unlockedUpdate, watcher.Store(), &p.crossUpdate)
124128
namespaceWatcher.AddEventHandler(updater)
125129
}
126130

127-
return watcher, nil
131+
return p, nil
132+
}
133+
134+
// Start starts the eventer
135+
func (p *pod) Start() error {
136+
if p.nodeWatcher != nil {
137+
err := p.nodeWatcher.Start()
138+
if err != nil {
139+
return err
140+
}
141+
}
142+
143+
if p.namespaceWatcher != nil {
144+
if err := p.namespaceWatcher.Start(); err != nil {
145+
return err
146+
}
147+
}
148+
149+
return p.watcher.Start()
150+
}
151+
152+
// Stop stops the eventer
153+
func (p *pod) Stop() {
154+
p.watcher.Stop()
155+
156+
if p.namespaceWatcher != nil {
157+
p.namespaceWatcher.Stop()
158+
}
159+
160+
if p.nodeWatcher != nil {
161+
p.nodeWatcher.Stop()
162+
}
128163
}
129164

130165
func (p *pod) emitRunning(pod *kubernetes.Pod) {

x-pack/elastic-agent/pkg/composable/providers/kubernetes/service.go

Lines changed: 28 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ type service struct {
2727
scope string
2828
config *Config
2929
metagen metadata.MetaGen
30+
watcher kubernetes.Watcher
3031
namespaceWatcher kubernetes.Watcher
3132
}
3233

@@ -36,13 +37,13 @@ type serviceData struct {
3637
processors []map[string]interface{}
3738
}
3839

39-
// NewServiceWatcher creates a watcher that can discover and process service objects
40-
func NewServiceWatcher(
40+
// NewServiceEventer creates an eventer that can discover and process service objects
41+
func NewServiceEventer(
4142
comm composable.DynamicProviderComm,
4243
cfg *Config,
4344
logger *logp.Logger,
4445
client k8s.Interface,
45-
scope string) (kubernetes.Watcher, error) {
46+
scope string) (Eventer, error) {
4647
watcher, err := kubernetes.NewWatcher(client, &kubernetes.Service{}, kubernetes.WatchOptions{
4748
SyncTimeout: cfg.SyncPeriod,
4849
Node: cfg.Node,
@@ -68,17 +69,38 @@ func NewServiceWatcher(
6869
}
6970

7071
metaGen := metadata.NewServiceMetadataGenerator(rawConfig, watcher.Store(), namespaceMeta, client)
71-
watcher.AddEventHandler(&service{
72+
s := &service{
7273
logger,
7374
cfg.CleanupTimeout,
7475
comm,
7576
scope,
7677
cfg,
7778
metaGen,
79+
watcher,
7880
namespaceWatcher,
79-
})
81+
}
82+
watcher.AddEventHandler(s)
83+
84+
return s, nil
85+
}
8086

81-
return watcher, nil
87+
// Start starts the eventer
88+
func (s *service) Start() error {
89+
if s.namespaceWatcher != nil {
90+
if err := s.namespaceWatcher.Start(); err != nil {
91+
return err
92+
}
93+
}
94+
return s.watcher.Start()
95+
}
96+
97+
// Stop stops the eventer
98+
func (s *service) Stop() {
99+
s.watcher.Stop()
100+
101+
if s.namespaceWatcher != nil {
102+
s.namespaceWatcher.Stop()
103+
}
82104
}
83105

84106
func (s *service) emitRunning(service *kubernetes.Service) {

0 commit comments

Comments
 (0)