Skip to content

Commit 06f05ca

Browse files
authored
Remove unnecessary restarts of metricsets while using Node autodiscover (#19974)
1 parent fb52d26 commit 06f05ca

4 files changed

Lines changed: 228 additions & 5 deletions

File tree

CHANGELOG.next.asciidoc

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -150,6 +150,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d
150150
- Server-side TLS config now validates certificate and key are both specified {pull}19584[19584]
151151
- Fix terminating pod autodiscover issue. {pull}20084[20084]
152152
- Fix seccomp policy for calls to `chmod` and `chown`. {pull}20054[20054]
153+
- Remove unnecessary restarts of metricsets while using Node autodiscover {pull}19974[19974]
153154
- Output errors when Kibana index pattern setup fails. {pull}20121[20121]
154155

155156
*Auditbeat*

libbeat/autodiscover/providers/kubernetes/node.go

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ import (
2323

2424
"github.com/gofrs/uuid"
2525
v1 "k8s.io/api/core/v1"
26+
"k8s.io/apimachinery/pkg/api/equality"
2627
k8s "k8s.io/client-go/kubernetes"
2728

2829
"github.com/elastic/beats/v7/libbeat/autodiscover/builder"
@@ -66,6 +67,7 @@ func NewNodeEventer(uuid uuid.UUID, cfg *common.Config, client k8s.Interface, pu
6667
watcher, err := kubernetes.NewWatcher(client, &kubernetes.Node{}, kubernetes.WatchOptions{
6768
SyncTimeout: config.SyncPeriod,
6869
Node: config.Node,
70+
IsUpdated: isUpdated,
6971
}, nil)
7072

7173
if err != nil {
@@ -190,6 +192,39 @@ func (n *node) emit(node *kubernetes.Node, flag string) {
190192
n.publish(event)
191193
}
192194

195+
func isUpdated(o, n interface{}) bool {
196+
old, _ := o.(*kubernetes.Node)
197+
new, _ := n.(*kubernetes.Node)
198+
199+
// Consider as not update in case one of the two objects is not a Node
200+
if old == nil || new == nil {
201+
return true
202+
}
203+
204+
// This is a resync. It is not an update
205+
if old.ResourceVersion == new.ResourceVersion {
206+
return false
207+
}
208+
209+
// If the old object and new object are different
210+
oldCopy := old.DeepCopy()
211+
oldCopy.ResourceVersion = ""
212+
213+
newCopy := new.DeepCopy()
214+
newCopy.ResourceVersion = ""
215+
216+
// If the old object and new object are different in either meta or spec then there is a valid change
217+
if !equality.Semantic.DeepEqual(oldCopy.Spec, newCopy.Spec) || !equality.Semantic.DeepEqual(oldCopy.ObjectMeta, newCopy.ObjectMeta) {
218+
return true
219+
}
220+
221+
// If there is a change in the node status then there is a valid change.
222+
if isNodeReady(old) != isNodeReady(new) {
223+
return true
224+
}
225+
return false
226+
}
227+
193228
func getAddress(node *kubernetes.Node) string {
194229
for _, address := range node.Status.Addresses {
195230
if address.Type == v1.NodeExternalIP && address.Address != "" {

libbeat/autodiscover/providers/kubernetes/node_test.go

Lines changed: 175 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -278,3 +278,178 @@ func TestEmitEvent_Node(t *testing.T) {
278278
})
279279
}
280280
}
281+
282+
func TestNode_isUpdated(t *testing.T) {
283+
tests := []struct {
284+
old *kubernetes.Node
285+
new *kubernetes.Node
286+
updated bool
287+
test string
288+
}{
289+
{
290+
test: "one of the objects is nil then its updated",
291+
old: nil,
292+
new: &kubernetes.Node{},
293+
updated: true,
294+
},
295+
{
296+
test: "both empty nodes should return not updated",
297+
old: &kubernetes.Node{},
298+
new: &kubernetes.Node{},
299+
updated: false,
300+
},
301+
{
302+
test: "resource version is the same should return not updated",
303+
old: &kubernetes.Node{
304+
ObjectMeta: kubernetes.ObjectMeta{
305+
ResourceVersion: "1",
306+
},
307+
},
308+
new: &kubernetes.Node{
309+
ObjectMeta: kubernetes.ObjectMeta{
310+
ResourceVersion: "1",
311+
},
312+
},
313+
},
314+
{
315+
test: "if meta changes then it should return updated",
316+
old: &kubernetes.Node{
317+
ObjectMeta: kubernetes.ObjectMeta{
318+
ResourceVersion: "1",
319+
Annotations: map[string]string{},
320+
},
321+
},
322+
new: &kubernetes.Node{
323+
ObjectMeta: kubernetes.ObjectMeta{
324+
ResourceVersion: "2",
325+
Annotations: map[string]string{
326+
"a": "b",
327+
},
328+
},
329+
},
330+
updated: true,
331+
},
332+
{
333+
test: "if spec changes then it should return updated",
334+
old: &kubernetes.Node{
335+
ObjectMeta: kubernetes.ObjectMeta{
336+
ResourceVersion: "1",
337+
Annotations: map[string]string{
338+
"a": "b",
339+
},
340+
},
341+
Spec: v1.NodeSpec{
342+
ProviderID: "1",
343+
Unschedulable: false,
344+
},
345+
},
346+
new: &kubernetes.Node{
347+
ObjectMeta: kubernetes.ObjectMeta{
348+
ResourceVersion: "2",
349+
Annotations: map[string]string{
350+
"a": "b",
351+
},
352+
},
353+
Spec: v1.NodeSpec{
354+
ProviderID: "1",
355+
Unschedulable: true,
356+
},
357+
},
358+
updated: true,
359+
},
360+
{
361+
test: "if overall status doesn't change then its not an update",
362+
old: &kubernetes.Node{
363+
ObjectMeta: kubernetes.ObjectMeta{
364+
ResourceVersion: "1",
365+
Annotations: map[string]string{
366+
"a": "b",
367+
},
368+
},
369+
Spec: v1.NodeSpec{
370+
ProviderID: "1",
371+
Unschedulable: true,
372+
},
373+
Status: v1.NodeStatus{
374+
Conditions: []v1.NodeCondition{
375+
{
376+
Type: v1.NodeReady,
377+
Status: v1.ConditionTrue,
378+
},
379+
},
380+
},
381+
},
382+
new: &kubernetes.Node{
383+
ObjectMeta: kubernetes.ObjectMeta{
384+
ResourceVersion: "2",
385+
Annotations: map[string]string{
386+
"a": "b",
387+
},
388+
},
389+
Spec: v1.NodeSpec{
390+
ProviderID: "1",
391+
Unschedulable: true,
392+
},
393+
Status: v1.NodeStatus{
394+
Conditions: []v1.NodeCondition{
395+
{
396+
Type: v1.NodeReady,
397+
Status: v1.ConditionTrue,
398+
},
399+
},
400+
},
401+
},
402+
updated: false,
403+
},
404+
{
405+
test: "if node status changes then its an update",
406+
old: &kubernetes.Node{
407+
ObjectMeta: kubernetes.ObjectMeta{
408+
ResourceVersion: "1",
409+
Annotations: map[string]string{
410+
"a": "b",
411+
},
412+
},
413+
Spec: v1.NodeSpec{
414+
ProviderID: "1",
415+
Unschedulable: true,
416+
},
417+
Status: v1.NodeStatus{
418+
Conditions: []v1.NodeCondition{
419+
{
420+
Type: v1.NodeReady,
421+
Status: v1.ConditionFalse,
422+
},
423+
},
424+
},
425+
},
426+
new: &kubernetes.Node{
427+
ObjectMeta: kubernetes.ObjectMeta{
428+
ResourceVersion: "2",
429+
Annotations: map[string]string{
430+
"a": "b",
431+
},
432+
},
433+
Spec: v1.NodeSpec{
434+
ProviderID: "1",
435+
Unschedulable: true,
436+
},
437+
Status: v1.NodeStatus{
438+
Conditions: []v1.NodeCondition{
439+
{
440+
Type: v1.NodeReady,
441+
Status: v1.ConditionTrue,
442+
},
443+
},
444+
},
445+
},
446+
updated: true,
447+
},
448+
}
449+
450+
for _, test := range tests {
451+
t.Run(test.test, func(t *testing.T) {
452+
assert.Equal(t, test.updated, isUpdated(test.old, test.new))
453+
})
454+
}
455+
}

libbeat/common/kubernetes/watcher.go

Lines changed: 17 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,9 @@ type WatchOptions struct {
6666
Node string
6767
// Namespace is used for filtering watched resource to given namespace, use "" for all namespaces
6868
Namespace string
69+
// IsUpdated allows registering a func that allows the invoker of the Watch to decide what amounts to an update
70+
// vs what does not.
71+
IsUpdated func(old, new interface{}) bool
6972
}
7073

7174
type item struct {
@@ -100,6 +103,19 @@ func NewWatcher(client kubernetes.Interface, resource Resource, opts WatchOption
100103
queue = workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), objType)
101104
ctx, cancel := context.WithCancel(context.Background())
102105

106+
if opts.IsUpdated == nil {
107+
opts.IsUpdated = func(o, n interface{}) bool {
108+
old, _ := accessor.ResourceVersion(o.(runtime.Object))
109+
new, _ := accessor.ResourceVersion(n.(runtime.Object))
110+
111+
// Only enqueue changes that have a different resource versions to avoid processing resyncs.
112+
if old != new {
113+
return true
114+
}
115+
return false
116+
}
117+
}
118+
103119
w := &watcher{
104120
client: client,
105121
informer: informer,
@@ -119,11 +135,7 @@ func NewWatcher(client kubernetes.Interface, resource Resource, opts WatchOption
119135
w.enqueue(o, delete)
120136
},
121137
UpdateFunc: func(o, n interface{}) {
122-
old, _ := accessor.ResourceVersion(o.(runtime.Object))
123-
new, _ := accessor.ResourceVersion(n.(runtime.Object))
124-
125-
// Only enqueue changes that have a different resource versions to avoid processing resyncs.
126-
if old != new {
138+
if opts.IsUpdated(o, n) {
127139
w.enqueue(n, update)
128140
}
129141
},

0 commit comments

Comments
 (0)