Skip to content

Commit 1207d63

Browse files
authored
Skip config check in autodiscover for duplicated configurations (#29048)
If the configuration is already running, it has been already checked, don't try to check it again to avoid problems with configuration checks that fail if some resource already exist with the same identifiers.
1 parent 3e84736 commit 1207d63

3 files changed

Lines changed: 91 additions & 13 deletions

File tree

CHANGELOG.next.asciidoc

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -144,6 +144,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d
144144
- Allows disable pod events enrichment with deployment name {pull}28521[28521]
145145
- Fix `fingerprint` processor to give it access to the `@timestamp` field. {issue}28683[28683]
146146
- Fix the wrong beat name on monitoring and state endpoint {issue}27755[27755]
147+
- Skip configuration checks in autodiscover for configurations that are already running {pull}29048[29048]
147148

148149
*Auditbeat*
149150

libbeat/autodiscover/autodiscover.go

Lines changed: 26 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,10 @@ type Autodiscover struct {
6464
meta *meta.Map
6565
listener bus.Listener
6666
logger *logp.Logger
67+
68+
// workDone is a channel used for testing purpouses, to know when the worker has
69+
// done some work.
70+
workDone chan struct{}
6771
}
6872

6973
// NewAutodiscover instantiates and returns a new Autodiscover manager
@@ -165,6 +169,11 @@ func (a *Autodiscover) worker() {
165169
// reset updated status
166170
updated = false
167171
}
172+
173+
// For testing purpouses.
174+
if a.workDone != nil {
175+
a.workDone <- struct{}{}
176+
}
168177
}
169178
}
170179

@@ -207,26 +216,30 @@ func (a *Autodiscover) handleStart(event bus.Event) bool {
207216
continue
208217
}
209218

210-
err = a.factory.CheckConfig(config)
211-
if err != nil {
212-
a.logger.Error(errors.Wrap(err, fmt.Sprintf(
213-
"Auto discover config check failed for config '%s', won't start runner",
214-
common.DebugString(config, true))))
215-
continue
216-
}
217-
218219
// Update meta no matter what
219220
dynFields := a.meta.Store(hash, meta)
220221

222+
if _, ok := newCfg[hash]; ok {
223+
a.logger.Debugf("Config %v duplicated in start event", common.DebugString(config, true))
224+
continue
225+
}
226+
221227
if cfg, ok := a.configs[eventID][hash]; ok {
222228
a.logger.Debugf("Config %v is already running", common.DebugString(config, true))
223229
newCfg[hash] = cfg
224230
continue
225-
} else {
226-
newCfg[hash] = &reload.ConfigWithMeta{
227-
Config: config,
228-
Meta: &dynFields,
229-
}
231+
}
232+
233+
err = a.factory.CheckConfig(config)
234+
if err != nil {
235+
a.logger.Error(errors.Wrap(err, fmt.Sprintf(
236+
"Auto discover config check failed for config '%s', won't start runner",
237+
common.DebugString(config, true))))
238+
continue
239+
}
240+
newCfg[hash] = &reload.ConfigWithMeta{
241+
Config: config,
242+
Meta: &dynFields,
230243
}
231244

232245
updated = true

libbeat/autodiscover/autodiscover_test.go

Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -75,6 +75,8 @@ type mockAdapter struct {
7575
mutex sync.Mutex
7676
configs []*common.Config
7777
runners []*mockRunner
78+
79+
CheckConfigCallCount int
7880
}
7981

8082
// CreateConfig generates a valid list of configs from the given event, the received event will have all keys defined by `StartFilter`
@@ -87,6 +89,8 @@ func (m *mockAdapter) CreateConfig(event bus.Event) ([]*common.Config, error) {
8789

8890
// CheckConfig tests given config to check if it will work or not, returns errors in case it won't work
8991
func (m *mockAdapter) CheckConfig(c *common.Config) error {
92+
m.CheckConfigCallCount++
93+
9094
config := struct {
9195
Broken bool `config:"broken"`
9296
}{}
@@ -324,6 +328,66 @@ func TestAutodiscoverHash(t *testing.T) {
324328
assert.False(t, runners[1].stopped)
325329
}
326330

331+
func TestAutodiscoverDuplicatedConfigConfigCheckCalledOnce(t *testing.T) {
332+
goroutines := resources.NewGoroutinesChecker()
333+
defer goroutines.Check(t)
334+
335+
// Register mock autodiscover provider
336+
busChan := make(chan bus.Bus, 1)
337+
338+
Registry = NewRegistry()
339+
Registry.AddProvider("mock", func(beatName string, b bus.Bus, uuid uuid.UUID, c *common.Config, k keystore.Keystore) (Provider, error) {
340+
// intercept bus to mock events
341+
busChan <- b
342+
343+
return &mockProvider{}, nil
344+
})
345+
346+
// Create a mock adapter that returns a duplicated config
347+
runnerConfig, _ := common.NewConfigFrom(map[string]string{
348+
"id": "foo",
349+
})
350+
adapter := mockAdapter{
351+
configs: []*common.Config{runnerConfig, runnerConfig},
352+
}
353+
354+
// and settings:
355+
providerConfig, _ := common.NewConfigFrom(map[string]string{
356+
"type": "mock",
357+
})
358+
config := Config{
359+
Providers: []*common.Config{providerConfig},
360+
}
361+
k, _ := keystore.NewFileKeystore("test")
362+
// Create autodiscover manager
363+
autodiscover, err := NewAutodiscover("test", nil, &adapter, &adapter, &config, k)
364+
if err != nil {
365+
t.Fatal(err)
366+
}
367+
368+
autodiscover.workDone = make(chan struct{})
369+
370+
// Start it
371+
autodiscover.Start()
372+
defer autodiscover.Stop()
373+
eventBus := <-busChan
374+
375+
// Publish a couple of events.
376+
for i := 0; i < 2; i++ {
377+
eventBus.Publish(bus.Event{
378+
"id": "foo",
379+
"provider": "mock",
380+
"start": true,
381+
"meta": common.MapStr{
382+
"foo": "bar",
383+
},
384+
})
385+
<-autodiscover.workDone
386+
assert.Equal(t, 1, len(adapter.Runners()), "Only one runner should be started")
387+
assert.Equal(t, 1, adapter.CheckConfigCallCount, "Check config should have been called only once")
388+
}
389+
}
390+
327391
func TestAutodiscoverWithConfigCheckFailures(t *testing.T) {
328392
goroutines := resources.NewGoroutinesChecker()
329393
defer goroutines.Check(t)

0 commit comments

Comments
 (0)