@@ -20,18 +20,16 @@ package beater
2020import (
2121 "sync"
2222
23- "github.com/elastic/beats/libbeat/common/reload"
24- "github.com/elastic/beats/libbeat/management"
25- "github.com/elastic/beats/libbeat/paths"
26-
27- "github.com/joeshaw/multierror"
2823 "github.com/pkg/errors"
2924
3025 "github.com/elastic/beats/libbeat/autodiscover"
3126 "github.com/elastic/beats/libbeat/beat"
3227 "github.com/elastic/beats/libbeat/cfgfile"
3328 "github.com/elastic/beats/libbeat/common"
29+ "github.com/elastic/beats/libbeat/common/reload"
3430 "github.com/elastic/beats/libbeat/logp"
31+ "github.com/elastic/beats/libbeat/management"
32+ "github.com/elastic/beats/libbeat/paths"
3533 "github.com/elastic/beats/metricbeat/mb"
3634 "github.com/elastic/beats/metricbeat/mb/module"
3735
@@ -44,20 +42,15 @@ import (
4442
4543// Metricbeat implements the Beater interface for metricbeat.
4644type Metricbeat struct {
47- done chan struct {} // Channel used to initiate shutdown.
48- modules []staticModule // Active list of modules .
45+ done chan struct {} // Channel used to initiate shutdown.
46+ runners []module. Runner // Active list of module runners .
4947 config Config
5048 autodiscover * autodiscover.Autodiscover
5149
5250 // Options
5351 moduleOptions []module.Option
5452}
5553
56- type staticModule struct {
57- connector * module.Connector
58- module * module.Wrapper
59- }
60-
6154// Option specifies some optional arguments used for configuring the behavior
6255// of the Metricbeat framework.
6356type Option func (mb * Metricbeat )
@@ -162,46 +155,28 @@ func newMetricbeat(b *beat.Beat, c *common.Config, options ...Option) (*Metricbe
162155 moduleOptions := append (
163156 []module.Option {module .WithMaxStartDelay (config .MaxStartDelay )},
164157 metricbeat .moduleOptions ... )
165- var errs multierror.Errors
158+
159+ factory := module .NewFactory (b .Info , moduleOptions ... )
160+
166161 for _ , moduleCfg := range config .Modules {
167162 if ! moduleCfg .Enabled () {
168163 continue
169164 }
170165
171- failed := false
172-
173- connector , err := module .NewConnector (b .Info , b .Publisher , moduleCfg , nil )
166+ runner , err := factory .Create (b .Publisher , moduleCfg , nil )
174167 if err != nil {
175- errs = append (errs , err )
176- failed = true
177- }
178-
179- module , err := module .NewWrapper (moduleCfg , mb .Registry , moduleOptions ... )
180- if err != nil {
181- errs = append (errs , err )
182- failed = true
183- }
184-
185- if failed {
186- continue
168+ return nil , err
187169 }
188170
189- metricbeat .modules = append (metricbeat .modules , staticModule {
190- connector : connector ,
191- module : module ,
192- })
171+ metricbeat .runners = append (metricbeat .runners , runner )
193172 }
194173
195- if err := errs .Err (); err != nil {
196- return nil , err
197- }
198- if len (metricbeat .modules ) == 0 && ! dynamicCfgEnabled {
174+ if len (metricbeat .runners ) == 0 && ! dynamicCfgEnabled {
199175 return nil , mb .ErrAllModulesDisabled
200176 }
201177
202178 if config .Autodiscover != nil {
203179 var err error
204- factory := module .NewFactory (b .Info , metricbeat .moduleOptions ... )
205180 adapter := autodiscover .NewFactoryAdapter (factory )
206181 metricbeat .autodiscover , err = autodiscover .NewAutodiscover ("metricbeat" , b .Publisher , adapter , config .Autodiscover )
207182 if err != nil {
@@ -220,20 +195,16 @@ func newMetricbeat(b *beat.Beat, c *common.Config, options ...Option) (*Metricbe
220195func (bt * Metricbeat ) Run (b * beat.Beat ) error {
221196 var wg sync.WaitGroup
222197
223- // Static modules (metricbeat.modules)
224- for _ , m := range bt .modules {
225- client , err := m .connector .Connect ()
226- if err != nil {
227- return err
228- }
229-
230- r := module .NewRunner (client , m .module )
198+ // Static modules (metricbeat.runners)
199+ for _ , r := range bt .runners {
231200 r .Start ()
232201 wg .Add (1 )
202+
203+ thatRunner := r
233204 go func () {
234205 defer wg .Done ()
235206 <- bt .done
236- r .Stop ()
207+ thatRunner .Stop ()
237208 }()
238209 }
239210
@@ -289,38 +260,7 @@ func (bt *Metricbeat) Stop() {
289260 close (bt .done )
290261}
291262
292- // Modules return a list of all configured modules, including anyone present
293- // under dynamic config settings.
263+ // Modules return a list of all configured modules.
294264func (bt * Metricbeat ) Modules () ([]* module.Wrapper , error ) {
295- var modules []* module.Wrapper
296- for _ , m := range bt .modules {
297- modules = append (modules , m .module )
298- }
299-
300- // Add dynamic modules
301- if bt .config .ConfigModules .Enabled () {
302- config := cfgfile .DefaultDynamicConfig
303- bt .config .ConfigModules .Unpack (& config )
304-
305- modulesManager , err := cfgfile .NewGlobManager (config .Path , ".yml" , ".disabled" )
306- if err != nil {
307- return nil , errors .Wrap (err , "initialization error" )
308- }
309-
310- for _ , file := range modulesManager .ListEnabled () {
311- confs , err := cfgfile .LoadList (file .Path )
312- if err != nil {
313- return nil , errors .Wrap (err , "error loading config files" )
314- }
315- for _ , conf := range confs {
316- m , err := module .NewWrapper (conf , mb .Registry , bt .moduleOptions ... )
317- if err != nil {
318- return nil , errors .Wrap (err , "module initialization error" )
319- }
320- modules = append (modules , m )
321- }
322- }
323- }
324-
325- return modules , nil
265+ return module .ConfiguredModules (bt .config .Modules , bt .config .ConfigModules , bt .moduleOptions )
326266}
0 commit comments