Skip to content

Commit a70d6e8

Browse files
authored
[Metricbeat] Support processors defined for light modules (#15923)
* Adjust test cases first * Setup processors for light modules * Fix: comments, mage check * Adjust code * Fix: missing comment * Fix: mage check * Test light modules * Test connector * Test runner * Fix: ToLower * Adjust code after review * Fix: mage check * Adjust code after review * Adjust code after review * Fix: check error * Fix: imports * Increase test coverage * Add unit tests * Fix: hound * beater: use factory * Beater: modules * Fix: system tests * Fix: implements interface * Add changelog entry * Verify if processors are setup
1 parent 6273589 commit a70d6e8

31 files changed

Lines changed: 586 additions & 383 deletions

CHANGELOG.next.asciidoc

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -125,6 +125,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d
125125
- Add `key/value` mode for SQL module. {issue}15770[15770] {pull]15845[15845]
126126
- Add STAN dashboard {pull}15654[15654]
127127
- Add support for Unix socket in Memcached metricbeat module. {issue}13685[13685] {pull}15822[15822]
128+
- Add support for processors in light modules. {issue}14740[14740] {pull}15923[15923]
128129

129130
*Packetbeat*
130131

metricbeat/beater/metricbeat.go

Lines changed: 19 additions & 79 deletions
Original file line numberDiff line numberDiff line change
@@ -20,18 +20,16 @@ package beater
2020
import (
2121
"sync"
2222

23-
"github.com/elastic/beats/libbeat/common/reload"
24-
"github.com/elastic/beats/libbeat/management"
25-
"github.com/elastic/beats/libbeat/paths"
26-
27-
"github.com/joeshaw/multierror"
2823
"github.com/pkg/errors"
2924

3025
"github.com/elastic/beats/libbeat/autodiscover"
3126
"github.com/elastic/beats/libbeat/beat"
3227
"github.com/elastic/beats/libbeat/cfgfile"
3328
"github.com/elastic/beats/libbeat/common"
29+
"github.com/elastic/beats/libbeat/common/reload"
3430
"github.com/elastic/beats/libbeat/logp"
31+
"github.com/elastic/beats/libbeat/management"
32+
"github.com/elastic/beats/libbeat/paths"
3533
"github.com/elastic/beats/metricbeat/mb"
3634
"github.com/elastic/beats/metricbeat/mb/module"
3735

@@ -44,20 +42,15 @@ import (
4442

4543
// Metricbeat implements the Beater interface for metricbeat.
4644
type Metricbeat struct {
47-
done chan struct{} // Channel used to initiate shutdown.
48-
modules []staticModule // Active list of modules.
45+
done chan struct{} // Channel used to initiate shutdown.
46+
runners []module.Runner // Active list of module runners.
4947
config Config
5048
autodiscover *autodiscover.Autodiscover
5149

5250
// Options
5351
moduleOptions []module.Option
5452
}
5553

56-
type staticModule struct {
57-
connector *module.Connector
58-
module *module.Wrapper
59-
}
60-
6154
// Option specifies some optional arguments used for configuring the behavior
6255
// of the Metricbeat framework.
6356
type Option func(mb *Metricbeat)
@@ -162,46 +155,28 @@ func newMetricbeat(b *beat.Beat, c *common.Config, options ...Option) (*Metricbe
162155
moduleOptions := append(
163156
[]module.Option{module.WithMaxStartDelay(config.MaxStartDelay)},
164157
metricbeat.moduleOptions...)
165-
var errs multierror.Errors
158+
159+
factory := module.NewFactory(b.Info, moduleOptions...)
160+
166161
for _, moduleCfg := range config.Modules {
167162
if !moduleCfg.Enabled() {
168163
continue
169164
}
170165

171-
failed := false
172-
173-
connector, err := module.NewConnector(b.Info, b.Publisher, moduleCfg, nil)
166+
runner, err := factory.Create(b.Publisher, moduleCfg, nil)
174167
if err != nil {
175-
errs = append(errs, err)
176-
failed = true
177-
}
178-
179-
module, err := module.NewWrapper(moduleCfg, mb.Registry, moduleOptions...)
180-
if err != nil {
181-
errs = append(errs, err)
182-
failed = true
183-
}
184-
185-
if failed {
186-
continue
168+
return nil, err
187169
}
188170

189-
metricbeat.modules = append(metricbeat.modules, staticModule{
190-
connector: connector,
191-
module: module,
192-
})
171+
metricbeat.runners = append(metricbeat.runners, runner)
193172
}
194173

195-
if err := errs.Err(); err != nil {
196-
return nil, err
197-
}
198-
if len(metricbeat.modules) == 0 && !dynamicCfgEnabled {
174+
if len(metricbeat.runners) == 0 && !dynamicCfgEnabled {
199175
return nil, mb.ErrAllModulesDisabled
200176
}
201177

202178
if config.Autodiscover != nil {
203179
var err error
204-
factory := module.NewFactory(b.Info, metricbeat.moduleOptions...)
205180
adapter := autodiscover.NewFactoryAdapter(factory)
206181
metricbeat.autodiscover, err = autodiscover.NewAutodiscover("metricbeat", b.Publisher, adapter, config.Autodiscover)
207182
if err != nil {
@@ -220,20 +195,16 @@ func newMetricbeat(b *beat.Beat, c *common.Config, options ...Option) (*Metricbe
220195
func (bt *Metricbeat) Run(b *beat.Beat) error {
221196
var wg sync.WaitGroup
222197

223-
// Static modules (metricbeat.modules)
224-
for _, m := range bt.modules {
225-
client, err := m.connector.Connect()
226-
if err != nil {
227-
return err
228-
}
229-
230-
r := module.NewRunner(client, m.module)
198+
// Static modules (metricbeat.runners)
199+
for _, r := range bt.runners {
231200
r.Start()
232201
wg.Add(1)
202+
203+
thatRunner := r
233204
go func() {
234205
defer wg.Done()
235206
<-bt.done
236-
r.Stop()
207+
thatRunner.Stop()
237208
}()
238209
}
239210

@@ -289,38 +260,7 @@ func (bt *Metricbeat) Stop() {
289260
close(bt.done)
290261
}
291262

292-
// Modules return a list of all configured modules, including anyone present
293-
// under dynamic config settings.
263+
// Modules return a list of all configured modules.
294264
func (bt *Metricbeat) Modules() ([]*module.Wrapper, error) {
295-
var modules []*module.Wrapper
296-
for _, m := range bt.modules {
297-
modules = append(modules, m.module)
298-
}
299-
300-
// Add dynamic modules
301-
if bt.config.ConfigModules.Enabled() {
302-
config := cfgfile.DefaultDynamicConfig
303-
bt.config.ConfigModules.Unpack(&config)
304-
305-
modulesManager, err := cfgfile.NewGlobManager(config.Path, ".yml", ".disabled")
306-
if err != nil {
307-
return nil, errors.Wrap(err, "initialization error")
308-
}
309-
310-
for _, file := range modulesManager.ListEnabled() {
311-
confs, err := cfgfile.LoadList(file.Path)
312-
if err != nil {
313-
return nil, errors.Wrap(err, "error loading config files")
314-
}
315-
for _, conf := range confs {
316-
m, err := module.NewWrapper(conf, mb.Registry, bt.moduleOptions...)
317-
if err != nil {
318-
return nil, errors.Wrap(err, "module initialization error")
319-
}
320-
modules = append(modules, m)
321-
}
322-
}
323-
}
324-
325-
return modules, nil
265+
return module.ConfiguredModules(bt.config.Modules, bt.config.ConfigModules, bt.moduleOptions)
326266
}

metricbeat/docs/modules/activemq.asciidoc

Lines changed: 0 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -35,36 +35,6 @@ metricbeat.modules:
3535
path: '/api/jolokia/?ignoreErrors=true&canonicalNaming=false'
3636
username: admin # default username
3737
password: admin # default password
38-
processors:
39-
- script:
40-
lang: javascript
41-
source: >
42-
function process(event) {
43-
var broker_memory_broker_pct = event.Get("activemq.broker.memory.broker.pct")
44-
if (broker_memory_broker_pct != null) {
45-
event.Put("activemq.broker.memory.broker.pct", broker_memory_broker_pct / 100.0)
46-
}
47-
48-
var broker_memory_temp_pct = event.Get("activemq.broker.memory.temp.pct")
49-
if (broker_memory_temp_pct != null) {
50-
event.Put("activemq.broker.memory.temp.pct", broker_memory_temp_pct / 100.0)
51-
}
52-
53-
var broker_memory_store_pct = event.Get("activemq.broker.memory.store.pct")
54-
if (broker_memory_store_pct != null) {
55-
event.Put("activemq.broker.memory.store.pct", broker_memory_store_pct / 100.0)
56-
}
57-
58-
var queue_memory_broker_pct = event.Get("activemq.queue.memory.broker.pct")
59-
if (queue_memory_broker_pct != null) {
60-
event.Put("activemq.queue.memory.broker.pct", queue_memory_broker_pct / 100.0)
61-
}
62-
63-
var topic_memory_broker_pct = event.Get("activemq.topic.memory.broker.pct")
64-
if (topic_memory_broker_pct != null) {
65-
event.Put("activemq.topic.memory.broker.pct", topic_memory_broker_pct / 100.0)
66-
}
67-
}
6838
----
6939

7040
[float]

metricbeat/docs/modules/ibmmq.asciidoc

Lines changed: 0 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -55,26 +55,6 @@ metricbeat.modules:
5555
# This module uses the Prometheus collector metricset, all
5656
# the options for this metricset are also available here.
5757
metrics_path: /metrics
58-
59-
# The custom processor is responsible for filtering Prometheus metrics
60-
# not stricly related to the IBM MQ domain, e.g. system load, process,
61-
# metrics HTTP server.
62-
processors:
63-
- script:
64-
lang: javascript
65-
source: >
66-
function process(event) {
67-
var metrics = event.Get("prometheus.metrics");
68-
Object.keys(metrics).forEach(function(key) {
69-
if (!(key.match(/^ibmmq_.*$/))) {
70-
event.Delete("prometheus.metrics." + key);
71-
}
72-
});
73-
metrics = event.Get("prometheus.metrics");
74-
if (Object.keys(metrics).length == 0) {
75-
event.Cancel();
76-
}
77-
}
7858
----
7959

8060
It also supports the options described in <<module-http-config-options>>.

metricbeat/mb/lightmetricset.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ import (
2121
"github.com/pkg/errors"
2222

2323
"github.com/elastic/beats/libbeat/common"
24+
"github.com/elastic/beats/libbeat/processors"
2425
)
2526

2627
// LightMetricSet contains the definition of a non-registered metric set
@@ -33,6 +34,7 @@ type LightMetricSet struct {
3334
MetricSet string `config:"metricset" validate:"required"`
3435
Defaults interface{} `config:"defaults"`
3536
} `config:"input" validate:"required"`
37+
Processors processors.PluginConfig `config:"processors"`
3638
}
3739

3840
// Registration obtains a metric set registration for this light metric set, this registration

metricbeat/mb/lightmodules.go

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ import (
2828

2929
"github.com/elastic/beats/libbeat/common"
3030
"github.com/elastic/beats/libbeat/logp"
31+
"github.com/elastic/beats/libbeat/processors"
3132
)
3233

3334
const (
@@ -150,6 +151,19 @@ type lightModuleConfig struct {
150151
MetricSets []string `config:"metricsets"`
151152
}
152153

154+
// ProcessorsForMetricSet returns processors defined for the light metricset.
155+
func (s *LightModulesSource) ProcessorsForMetricSet(r *Register, moduleName string, metricSetName string) (*processors.Processors, error) {
156+
module, err := s.loadModule(r, moduleName)
157+
if err != nil {
158+
return nil, errors.Wrapf(err, "reading processors for metricset '%s' in module '%s' failed", metricSetName, moduleName)
159+
}
160+
metricSet, ok := module.MetricSets[metricSetName]
161+
if !ok {
162+
return nil, fmt.Errorf("unknown metricset '%s' in module '%s'", metricSetName, moduleName)
163+
}
164+
return processors.New(metricSet.Processors)
165+
}
166+
153167
// LightModule contains the definition of a light module
154168
type LightModule struct {
155169
Name string

metricbeat/mb/lightmodules_test.go

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ import (
2828

2929
"github.com/elastic/beats/libbeat/common"
3030
"github.com/elastic/beats/libbeat/logp"
31+
_ "github.com/elastic/beats/libbeat/processors/add_id"
3132
)
3233

3334
// TestLightModulesAsModuleSource checks that registry correctly lists
@@ -302,6 +303,31 @@ func TestNewModulesCallModuleFactory(t *testing.T) {
302303
assert.True(t, called, "module factory must be called if registered")
303304
}
304305

306+
func TestProcessorsForMetricSet_UnknownModule(t *testing.T) {
307+
r := NewRegister()
308+
source := NewLightModulesSource("testdata/lightmodules")
309+
procs, err := source.ProcessorsForMetricSet(r, "nonexisting", "fake")
310+
require.Error(t, err)
311+
require.Nil(t, procs)
312+
}
313+
314+
func TestProcessorsForMetricSet_UnknownMetricSet(t *testing.T) {
315+
r := NewRegister()
316+
source := NewLightModulesSource("testdata/lightmodules")
317+
procs, err := source.ProcessorsForMetricSet(r, "unpack", "nonexisting")
318+
require.Error(t, err)
319+
require.Nil(t, procs)
320+
}
321+
322+
func TestProcessorsForMetricSet_ProcessorsRead(t *testing.T) {
323+
r := NewRegister()
324+
source := NewLightModulesSource("testdata/lightmodules")
325+
procs, err := source.ProcessorsForMetricSet(r, "unpack", "withprocessors")
326+
require.NoError(t, err)
327+
require.NotNil(t, procs)
328+
require.Len(t, procs.List, 1)
329+
}
330+
305331
type metricSetWithOption struct {
306332
BaseMetricSet
307333
Option string
Lines changed: 65 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,65 @@
1+
// Licensed to Elasticsearch B.V. under one or more contributor
2+
// license agreements. See the NOTICE file distributed with
3+
// this work for additional information regarding copyright
4+
// ownership. Elasticsearch B.V. licenses this file to you under
5+
// the Apache License, Version 2.0 (the "License"); you may
6+
// not use this file except in compliance with the License.
7+
// You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
package module
19+
20+
import (
21+
"github.com/pkg/errors"
22+
23+
"github.com/elastic/beats/libbeat/cfgfile"
24+
"github.com/elastic/beats/libbeat/common"
25+
"github.com/elastic/beats/metricbeat/mb"
26+
)
27+
28+
// ConfiguredModules returns a list of all configured modules, including anyone present under dynamic config settings.
29+
func ConfiguredModules(modulesData []*common.Config, configModulesData *common.Config, moduleOptions []Option) ([]*Wrapper, error) {
30+
var modules []*Wrapper
31+
32+
for _, moduleCfg := range modulesData {
33+
module, err := NewWrapper(moduleCfg, mb.Registry, nil)
34+
if err != nil {
35+
return nil, err
36+
}
37+
modules = append(modules, module)
38+
}
39+
40+
// Add dynamic modules
41+
if configModulesData.Enabled() {
42+
config := cfgfile.DefaultDynamicConfig
43+
configModulesData.Unpack(&config)
44+
45+
modulesManager, err := cfgfile.NewGlobManager(config.Path, ".yml", ".disabled")
46+
if err != nil {
47+
return nil, errors.Wrap(err, "initialization error")
48+
}
49+
50+
for _, file := range modulesManager.ListEnabled() {
51+
confs, err := cfgfile.LoadList(file.Path)
52+
if err != nil {
53+
return nil, errors.Wrap(err, "error loading config files")
54+
}
55+
for _, conf := range confs {
56+
m, err := NewWrapper(conf, mb.Registry, moduleOptions...)
57+
if err != nil {
58+
return nil, errors.Wrap(err, "module initialization error")
59+
}
60+
modules = append(modules, m)
61+
}
62+
}
63+
}
64+
return modules, nil
65+
}

0 commit comments

Comments
 (0)