@@ -25,34 +25,31 @@ import (
2525 k8s "k8s.io/client-go/kubernetes"
2626
2727 "github.com/elastic/beats/libbeat/autodiscover/builder"
28-
2928 "github.com/elastic/beats/libbeat/common"
3029 "github.com/elastic/beats/libbeat/common/bus"
3130 "github.com/elastic/beats/libbeat/common/kubernetes"
31+ "github.com/elastic/beats/libbeat/common/kubernetes/metadata"
3232 "github.com/elastic/beats/libbeat/common/safemapstr"
3333 "github.com/elastic/beats/libbeat/logp"
3434)
3535
3636type pod struct {
37- uuid uuid.UUID
38- config * Config
39- metagen kubernetes.MetaGenerator
40- logger * logp.Logger
41- publish func (bus.Event )
42- watcher kubernetes.Watcher
37+ uuid uuid.UUID
38+ config * Config
39+ metagen metadata.MetaGen
40+ logger * logp.Logger
41+ publish func (bus.Event )
42+ watcher kubernetes.Watcher
43+ nodeWatcher kubernetes.Watcher
44+ namespaceWatcher kubernetes.Watcher
4345}
4446
4547// NewPodEventer creates an eventer that can discover and process pod objects
4648func NewPodEventer (uuid uuid.UUID , cfg * common.Config , client k8s.Interface , publish func (event bus.Event )) (Eventer , error ) {
47- metagen , err := kubernetes .NewMetaGenerator (cfg )
48- if err != nil {
49- return nil , err
50- }
51-
5249 logger := logp .NewLogger ("autodiscover.pod" )
5350
5451 config := defaultConfig ()
55- err = cfg .Unpack (& config )
52+ err : = cfg .Unpack (& config )
5653 if err != nil {
5754 return nil , err
5855 }
@@ -71,18 +68,52 @@ func NewPodEventer(uuid uuid.UUID, cfg *common.Config, client k8s.Interface, pub
7168 SyncTimeout : config .SyncPeriod ,
7269 Node : config .Node ,
7370 Namespace : config .Namespace ,
74- })
71+ }, nil )
7572 if err != nil {
7673 return nil , fmt .Errorf ("couldn't create watcher for %T due to error %+v" , & kubernetes.Pod {}, err )
7774 }
7875
76+ var nodeMeta , namespaceMeta metadata.MetaGen
77+ var nodeWatcher , namespaceWatcher kubernetes.Watcher
78+ metaConf := config .AddResourceMetadata
79+ if metaConf != nil {
80+ if metaConf .Node != nil && metaConf .Node .Enabled () {
81+ options := kubernetes.WatchOptions {
82+ SyncTimeout : config .SyncPeriod ,
83+ Node : config .Node ,
84+ }
85+ if config .Namespace != "" {
86+ options .Namespace = config .Namespace
87+ }
88+ nodeWatcher , err = kubernetes .NewWatcher (client , & kubernetes.Node {}, options , nil )
89+ if err != nil {
90+ return nil , fmt .Errorf ("couldn't create watcher for %T due to error %+v" , & kubernetes.Node {}, err )
91+ }
92+
93+ nodeMeta = metadata .NewNodeMetadataGenerator (metaConf .Node , nodeWatcher .Store ())
94+ }
95+
96+ if metaConf .Namespace != nil && metaConf .Namespace .Enabled () {
97+ namespaceWatcher , err = kubernetes .NewWatcher (client , & kubernetes.Namespace {}, kubernetes.WatchOptions {
98+ SyncTimeout : config .SyncPeriod ,
99+ }, nil )
100+ if err != nil {
101+ return nil , fmt .Errorf ("couldn't create watcher for %T due to error %+v" , & kubernetes.Namespace {}, err )
102+ }
103+
104+ namespaceMeta = metadata .NewNamespaceMetadataGenerator (metaConf .Namespace , namespaceWatcher .Store ())
105+ }
106+ }
107+
79108 p := & pod {
80- config : config ,
81- uuid : uuid ,
82- publish : publish ,
83- metagen : metagen ,
84- logger : logger ,
85- watcher : watcher ,
109+ config : config ,
110+ uuid : uuid ,
111+ publish : publish ,
112+ metagen : metadata .NewPodMetadataGenerator (cfg , watcher .Store (), nodeMeta , namespaceMeta ),
113+ logger : logger ,
114+ watcher : watcher ,
115+ nodeWatcher : nodeWatcher ,
116+ namespaceWatcher : namespaceWatcher ,
86117 }
87118
88119 watcher .AddEventHandler (p )
@@ -168,12 +199,33 @@ func (p *pod) GenerateHints(event bus.Event) bus.Event {
168199
169200// Start starts the eventer
170201func (p * pod ) Start () error {
202+ if p .nodeWatcher != nil {
203+ err := p .nodeWatcher .Start ()
204+ if err != nil {
205+ return err
206+ }
207+ }
208+
209+ if p .namespaceWatcher != nil {
210+ if err := p .namespaceWatcher .Start (); err != nil {
211+ return err
212+ }
213+ }
214+
171215 return p .watcher .Start ()
172216}
173217
174218// Stop stops the eventer
175219func (p * pod ) Stop () {
176220 p .watcher .Stop ()
221+
222+ if p .namespaceWatcher != nil {
223+ p .namespaceWatcher .Stop ()
224+ }
225+
226+ if p .nodeWatcher != nil {
227+ p .nodeWatcher .Stop ()
228+ }
177229}
178230
179231func (p * pod ) emit (pod * kubernetes.Pod , flag string ) {
@@ -231,7 +283,8 @@ func (p *pod) emitEvents(pod *kubernetes.Pod, flag string, containers []kubernet
231283 "image" : c .Image ,
232284 "runtime" : runtimes [c .Name ],
233285 }
234- meta := p .metagen .ContainerMetadata (pod , c .Name , c .Image )
286+ meta := p .metagen .Generate (pod , metadata .WithFields ("container.name" , c .Name ),
287+ metadata .WithFields ("container.image" , c .Image ))
235288
236289 // Information that can be used in discovering a workload
237290 kubemeta := meta .Clone ()
0 commit comments