@@ -41,6 +41,7 @@ import (
4141 "github.com/elastic/beats/v7/libbeat/management"
4242 "github.com/elastic/beats/v7/libbeat/monitoring"
4343 "github.com/elastic/beats/v7/libbeat/outputs/elasticsearch"
44+ "github.com/elastic/beats/v7/libbeat/publisher/pipetool"
4445
4546 _ "github.com/elastic/beats/v7/filebeat/include"
4647
@@ -66,6 +67,7 @@ type Filebeat struct {
6667 config * cfg.Config
6768 moduleRegistry * fileset.ModuleRegistry
6869 done chan struct {}
70+ pipeline beat.PipelineConnector
6971}
7072
7173// New creates a new Filebeat pointer instance.
@@ -162,7 +164,7 @@ func (fb *Filebeat) setupPipelineLoaderCallback(b *beat.Beat) error {
162164 pipelineLoaderFactory := newPipelineLoaderFactory (b .Config .Output .Config ())
163165 modulesFactory := fileset .NewSetupFactory (b .Info , pipelineLoaderFactory )
164166 if fb .config .ConfigModules .Enabled () {
165- modulesLoader := cfgfile .NewReloader (b . Publisher , fb .config .ConfigModules )
167+ modulesLoader := cfgfile .NewReloader (fb . pipeline , fb .config .ConfigModules )
166168 modulesLoader .Load (modulesFactory )
167169 }
168170
@@ -235,8 +237,11 @@ func (fb *Filebeat) Run(b *beat.Beat) error {
235237 return err
236238 }
237239
240+ fb .pipeline = pipetool .WithDefaultGuarantees (b .Publisher , beat .GuaranteedSend )
241+ fb .pipeline = withPipelineEventCounter (fb .pipeline , wgEvents )
242+
238243 outDone := make (chan struct {}) // outDone closes down all active pipeline connections
239- pipelineConnector := channel .NewOutletFactory (outDone , wgEvents , b . Info ).Create
244+ pipelineConnector := channel .NewOutletFactory (outDone ).Create
240245
241246 // Create a ES connection factory for dynamic modules pipeline loading
242247 var pipelineLoaderFactory fileset.PipelineLoaderFactory
@@ -246,7 +251,8 @@ func (fb *Filebeat) Run(b *beat.Beat) error {
246251 logp .Warn (pipelinesWarning )
247252 }
248253
249- inputLoader := input .NewRunnerFactory (pipelineConnector , registrar , fb .done )
254+ inputLoader := channel .RunnerFactoryWithCommonInputSettings (b .Info ,
255+ input .NewRunnerFactory (pipelineConnector , registrar , fb .done ))
250256 moduleLoader := fileset .NewFactory (inputLoader , b .Info , pipelineLoaderFactory , config .OverwritePipelines )
251257
252258 crawler , err := newCrawler (inputLoader , moduleLoader , config .Inputs , fb .done , * once )
@@ -283,7 +289,7 @@ func (fb *Filebeat) Run(b *beat.Beat) error {
283289 logp .Debug ("modules" , "Existing Ingest pipelines will be updated" )
284290 }
285291
286- err = crawler .Start (b . Publisher , config .ConfigInput , config .ConfigModules )
292+ err = crawler .Start (fb . pipeline , config .ConfigInput , config .ConfigModules )
287293 if err != nil {
288294 crawler .Stop ()
289295 return fmt .Errorf ("Failed to start crawler: %+v" , err )
@@ -300,17 +306,17 @@ func (fb *Filebeat) Run(b *beat.Beat) error {
300306 }
301307
302308 // Register reloadable list of inputs and modules
303- inputs := cfgfile .NewRunnerList (management .DebugK , inputLoader , b . Publisher )
309+ inputs := cfgfile .NewRunnerList (management .DebugK , inputLoader , fb . pipeline )
304310 reload .Register .MustRegisterList ("filebeat.inputs" , inputs )
305311
306- modules := cfgfile .NewRunnerList (management .DebugK , moduleLoader , b . Publisher )
312+ modules := cfgfile .NewRunnerList (management .DebugK , moduleLoader , fb . pipeline )
307313 reload .Register .MustRegisterList ("filebeat.modules" , modules )
308314
309315 var adiscover * autodiscover.Autodiscover
310316 if fb .config .Autodiscover != nil {
311317 adiscover , err = autodiscover .NewAutodiscover (
312318 "filebeat" ,
313- b . Publisher ,
319+ fb . pipeline ,
314320 cfgfile .MultiplexedRunnerFactory (
315321 cfgfile .MatchHasField ("module" , moduleLoader ),
316322 cfgfile .MatchDefault (inputLoader ),
0 commit comments