@@ -19,11 +19,11 @@ package kubernetes
1919
2020import (
2121 "fmt"
22+ "sync"
2223 "time"
2324
2425 "github.com/gofrs/uuid"
2526 k8s "k8s.io/client-go/kubernetes"
26- "k8s.io/client-go/tools/cache"
2727
2828 "github.com/elastic/beats/v7/libbeat/autodiscover/builder"
2929 "github.com/elastic/beats/v7/libbeat/common"
@@ -43,7 +43,11 @@ type pod struct {
4343 watcher kubernetes.Watcher
4444 nodeWatcher kubernetes.Watcher
4545 namespaceWatcher kubernetes.Watcher
46- namespaceStore cache.Store
46+
47+ // Mutex used by configuration updates not triggered by the main watcher,
48+ // to avoid race conditions between cross updates and deletions.
49+ // Other updaters must use a write lock.
50+ crossUpdate sync.RWMutex
4751}
4852
4953// NewPodEventer creates an eventer that can discover and process pod objects
@@ -111,11 +115,20 @@ func NewPodEventer(uuid uuid.UUID, cfg *common.Config, client k8s.Interface, pub
111115 }
112116
113117 watcher .AddEventHandler (p )
118+
119+ if namespaceWatcher != nil && (config .Hints .Enabled () || metaConf .Namespace .Enabled ()) {
120+ updater := newNamespacePodUpdater (p .unlockedUpdate , watcher .Store (), & p .crossUpdate )
121+ namespaceWatcher .AddEventHandler (updater )
122+ }
123+
114124 return p , nil
115125}
116126
117127// OnAdd ensures processing of pod objects that are newly added
118128func (p * pod ) OnAdd (obj interface {}) {
129+ p .crossUpdate .RLock ()
130+ defer p .crossUpdate .RUnlock ()
131+
119132 p .logger .Debugf ("Watcher Pod add: %+v" , obj )
120133 p .emit (obj .(* kubernetes.Pod ), "start" )
121134}
@@ -124,6 +137,13 @@ func (p *pod) OnAdd(obj interface{}) {
124137// if it is terminating, a stop event is scheduled, if not, a stop and a start
125138// events are sent sequentially to recreate the resources assotiated to the pod.
126139func (p * pod ) OnUpdate (obj interface {}) {
140+ p .crossUpdate .RLock ()
141+ defer p .crossUpdate .RUnlock ()
142+
143+ p .unlockedUpdate (obj )
144+ }
145+
146+ func (p * pod ) unlockedUpdate (obj interface {}) {
127147 pod := obj .(* kubernetes.Pod )
128148
129149 p .logger .Debugf ("Watcher Pod update for pod: %+v, status: %+v" , pod .Name , pod .Status .Phase )
@@ -162,6 +182,9 @@ func (p *pod) OnUpdate(obj interface{}) {
162182
163183// OnDelete stops pod objects that are deleted
164184func (p * pod ) OnDelete (obj interface {}) {
185+ p .crossUpdate .RLock ()
186+ defer p .crossUpdate .RUnlock ()
187+
165188 p .logger .Debugf ("Watcher Pod delete: %+v" , obj )
166189 time .AfterFunc (p .config .CleanupTimeout , func () { p .emit (obj .(* kubernetes.Pod ), "stop" ) })
167190}
@@ -448,3 +471,60 @@ func (p *pod) emitEvents(pod *kubernetes.Pod, flag string, containers []kubernet
448471 p .publish (events )
449472 }
450473}
474+
475+ // podUpdaterHandlerFunc is a function that handles pod updater notifications.
476+ type podUpdaterHandlerFunc func (interface {})
477+
478+ // podUpdaterStore is the interface that an object needs to implement to be
479+ // used as a pod updater store.
480+ type podUpdaterStore interface {
481+ List () []interface {}
482+ }
483+
484+ // namespacePodUpdater notifies updates on pods when their namespaces are updated.
485+ type namespacePodUpdater struct {
486+ handler podUpdaterHandlerFunc
487+ store podUpdaterStore
488+ locker sync.Locker
489+ }
490+
491+ // newNamespacePodUpdater creates a namespacePodUpdater
492+ func newNamespacePodUpdater (handler podUpdaterHandlerFunc , store podUpdaterStore , locker sync.Locker ) * namespacePodUpdater {
493+ return & namespacePodUpdater {
494+ handler : handler ,
495+ store : store ,
496+ locker : locker ,
497+ }
498+ }
499+
500+ // OnUpdate handles update events on namespaces.
501+ func (n * namespacePodUpdater ) OnUpdate (obj interface {}) {
502+ ns , ok := obj .(* kubernetes.Namespace )
503+ if ! ok {
504+ return
505+ }
506+
507+ // n.store.List() returns a snapshot at this point. If a delete is received
508+ // from the main watcher, this loop may generate an update event after the
509+ // delete is processed, leaving configurations that would never be deleted.
510+ // Also this loop can miss updates, what could leave outdated configurations.
511+ // Avoid these issues by locking the processing of events from the main watcher.
512+ if n .locker != nil {
513+ n .locker .Lock ()
514+ defer n .locker .Unlock ()
515+ }
516+ for _ , pod := range n .store .List () {
517+ pod , ok := pod .(* kubernetes.Pod )
518+ if ok && pod .Namespace == ns .Name {
519+ n .handler (pod )
520+ }
521+ }
522+ }
523+
524+ // OnAdd handles add events on namespaces. Nothing to do, if pods are added to this
525+ // namespace they will generate their own add events.
526+ func (* namespacePodUpdater ) OnAdd (interface {}) {}
527+
528+ // OnDelete handles delete events on namespaces. Nothing to do, if pods are deleted from this
529+ // namespace they will generate their own delete events.
530+ func (* namespacePodUpdater ) OnDelete (interface {}) {}
0 commit comments