Skip to content

Commit 0aee9cb

Browse files
committed
Merge remote-tracking branch 'origin/master' into aws-jenkins
2 parents f92d499 + bc8123a commit 0aee9cb

12 files changed

Lines changed: 421 additions & 183 deletions

File tree

filebeat/beater/channels.go

Lines changed: 91 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,9 @@ import (
2222

2323
"github.com/elastic/beats/v7/filebeat/input/file"
2424
"github.com/elastic/beats/v7/filebeat/registrar"
25+
"github.com/elastic/beats/v7/libbeat/beat"
2526
"github.com/elastic/beats/v7/libbeat/monitoring"
27+
"github.com/elastic/beats/v7/libbeat/publisher/pipetool"
2628
)
2729

2830
type registrarLogger struct {
@@ -41,6 +43,23 @@ type eventCounter struct {
4143
wg sync.WaitGroup
4244
}
4345

46+
// countingClient adds and substracts from a counter when events have been
47+
// published, dropped or ACKed. The countingClient can be used to keep track of
48+
// inflight events for a beat.Client instance. The counter is updated after the
49+
// client has been disconnected from the publisher pipeline via 'Closed'.
50+
type countingClient struct {
51+
counter *eventCounter
52+
client beat.Client
53+
}
54+
55+
type countingEventer struct {
56+
wgEvents *eventCounter
57+
}
58+
59+
type combinedEventer struct {
60+
a, b beat.ClientEventer
61+
}
62+
4463
func newRegistrarLogger(reg *registrar.Registrar) *registrarLogger {
4564
return &registrarLogger{
4665
done: make(chan struct{}),
@@ -87,3 +106,75 @@ func (c *eventCounter) Done() {
87106
func (c *eventCounter) Wait() {
88107
c.wg.Wait()
89108
}
109+
110+
// withPipelineEventCounter adds a counter to the pipeline that keeps track of
111+
// all events published, dropped and ACKed by any active client.
112+
// The type accepted by counter is compatible with sync.WaitGroup.
113+
func withPipelineEventCounter(pipeline beat.PipelineConnector, counter *eventCounter) beat.PipelineConnector {
114+
counterListener := &countingEventer{counter}
115+
116+
pipeline = pipetool.WithClientConfigEdit(pipeline, func(config beat.ClientConfig) (beat.ClientConfig, error) {
117+
if evts := config.Events; evts != nil {
118+
config.Events = &combinedEventer{evts, counterListener}
119+
} else {
120+
config.Events = counterListener
121+
}
122+
return config, nil
123+
})
124+
125+
pipeline = pipetool.WithClientWrapper(pipeline, func(client beat.Client) beat.Client {
126+
return &countingClient{
127+
counter: counter,
128+
client: client,
129+
}
130+
})
131+
return pipeline
132+
}
133+
134+
func (c *countingClient) Publish(event beat.Event) {
135+
c.counter.Add(1)
136+
c.client.Publish(event)
137+
}
138+
139+
func (c *countingClient) PublishAll(events []beat.Event) {
140+
c.counter.Add(len(events))
141+
c.client.PublishAll(events)
142+
}
143+
144+
func (c *countingClient) Close() error {
145+
return c.client.Close()
146+
}
147+
148+
func (*countingEventer) Closing() {}
149+
func (*countingEventer) Closed() {}
150+
func (*countingEventer) Published() {}
151+
152+
func (c *countingEventer) FilteredOut(_ beat.Event) {}
153+
func (c *countingEventer) DroppedOnPublish(_ beat.Event) {
154+
c.wgEvents.Done()
155+
}
156+
157+
func (c *combinedEventer) Closing() {
158+
c.a.Closing()
159+
c.b.Closing()
160+
}
161+
162+
func (c *combinedEventer) Closed() {
163+
c.a.Closed()
164+
c.b.Closed()
165+
}
166+
167+
func (c *combinedEventer) Published() {
168+
c.a.Published()
169+
c.b.Published()
170+
}
171+
172+
func (c *combinedEventer) FilteredOut(event beat.Event) {
173+
c.a.FilteredOut(event)
174+
c.b.FilteredOut(event)
175+
}
176+
177+
func (c *combinedEventer) DroppedOnPublish(event beat.Event) {
178+
c.a.DroppedOnPublish(event)
179+
c.b.DroppedOnPublish(event)
180+
}

filebeat/beater/crawler.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -62,7 +62,7 @@ func newCrawler(
6262

6363
// Start starts the crawler with all inputs
6464
func (c *crawler) Start(
65-
pipeline beat.Pipeline,
65+
pipeline beat.PipelineConnector,
6666
configInputs *common.Config,
6767
configModules *common.Config,
6868
) error {
@@ -111,7 +111,7 @@ func (c *crawler) Start(
111111
}
112112

113113
func (c *crawler) startInput(
114-
pipeline beat.Pipeline,
114+
pipeline beat.PipelineConnector,
115115
config *common.Config,
116116
) error {
117117
if !config.Enabled() {

filebeat/beater/filebeat.go

Lines changed: 13 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@ import (
4141
"github.com/elastic/beats/v7/libbeat/management"
4242
"github.com/elastic/beats/v7/libbeat/monitoring"
4343
"github.com/elastic/beats/v7/libbeat/outputs/elasticsearch"
44+
"github.com/elastic/beats/v7/libbeat/publisher/pipetool"
4445

4546
_ "github.com/elastic/beats/v7/filebeat/include"
4647

@@ -66,6 +67,7 @@ type Filebeat struct {
6667
config *cfg.Config
6768
moduleRegistry *fileset.ModuleRegistry
6869
done chan struct{}
70+
pipeline beat.PipelineConnector
6971
}
7072

7173
// New creates a new Filebeat pointer instance.
@@ -162,7 +164,7 @@ func (fb *Filebeat) setupPipelineLoaderCallback(b *beat.Beat) error {
162164
pipelineLoaderFactory := newPipelineLoaderFactory(b.Config.Output.Config())
163165
modulesFactory := fileset.NewSetupFactory(b.Info, pipelineLoaderFactory)
164166
if fb.config.ConfigModules.Enabled() {
165-
modulesLoader := cfgfile.NewReloader(b.Publisher, fb.config.ConfigModules)
167+
modulesLoader := cfgfile.NewReloader(fb.pipeline, fb.config.ConfigModules)
166168
modulesLoader.Load(modulesFactory)
167169
}
168170

@@ -235,8 +237,11 @@ func (fb *Filebeat) Run(b *beat.Beat) error {
235237
return err
236238
}
237239

240+
fb.pipeline = pipetool.WithDefaultGuarantees(b.Publisher, beat.GuaranteedSend)
241+
fb.pipeline = withPipelineEventCounter(fb.pipeline, wgEvents)
242+
238243
outDone := make(chan struct{}) // outDone closes down all active pipeline connections
239-
pipelineConnector := channel.NewOutletFactory(outDone, wgEvents, b.Info).Create
244+
pipelineConnector := channel.NewOutletFactory(outDone).Create
240245

241246
// Create a ES connection factory for dynamic modules pipeline loading
242247
var pipelineLoaderFactory fileset.PipelineLoaderFactory
@@ -246,7 +251,8 @@ func (fb *Filebeat) Run(b *beat.Beat) error {
246251
logp.Warn(pipelinesWarning)
247252
}
248253

249-
inputLoader := input.NewRunnerFactory(pipelineConnector, registrar, fb.done)
254+
inputLoader := channel.RunnerFactoryWithCommonInputSettings(b.Info,
255+
input.NewRunnerFactory(pipelineConnector, registrar, fb.done))
250256
moduleLoader := fileset.NewFactory(inputLoader, b.Info, pipelineLoaderFactory, config.OverwritePipelines)
251257

252258
crawler, err := newCrawler(inputLoader, moduleLoader, config.Inputs, fb.done, *once)
@@ -283,7 +289,7 @@ func (fb *Filebeat) Run(b *beat.Beat) error {
283289
logp.Debug("modules", "Existing Ingest pipelines will be updated")
284290
}
285291

286-
err = crawler.Start(b.Publisher, config.ConfigInput, config.ConfigModules)
292+
err = crawler.Start(fb.pipeline, config.ConfigInput, config.ConfigModules)
287293
if err != nil {
288294
crawler.Stop()
289295
return fmt.Errorf("Failed to start crawler: %+v", err)
@@ -300,17 +306,17 @@ func (fb *Filebeat) Run(b *beat.Beat) error {
300306
}
301307

302308
// Register reloadable list of inputs and modules
303-
inputs := cfgfile.NewRunnerList(management.DebugK, inputLoader, b.Publisher)
309+
inputs := cfgfile.NewRunnerList(management.DebugK, inputLoader, fb.pipeline)
304310
reload.Register.MustRegisterList("filebeat.inputs", inputs)
305311

306-
modules := cfgfile.NewRunnerList(management.DebugK, moduleLoader, b.Publisher)
312+
modules := cfgfile.NewRunnerList(management.DebugK, moduleLoader, fb.pipeline)
307313
reload.Register.MustRegisterList("filebeat.modules", modules)
308314

309315
var adiscover *autodiscover.Autodiscover
310316
if fb.config.Autodiscover != nil {
311317
adiscover, err = autodiscover.NewAutodiscover(
312318
"filebeat",
313-
b.Publisher,
319+
fb.pipeline,
314320
cfgfile.MultiplexedRunnerFactory(
315321
cfgfile.MatchHasField("module", moduleLoader),
316322
cfgfile.MatchDefault(inputLoader),

filebeat/channel/connector.go

Lines changed: 1 addition & 85 deletions
Original file line numberDiff line numberDiff line change
@@ -20,9 +20,6 @@ package channel
2020
import (
2121
"github.com/elastic/beats/v7/libbeat/beat"
2222
"github.com/elastic/beats/v7/libbeat/common"
23-
"github.com/elastic/beats/v7/libbeat/common/fmtstr"
24-
"github.com/elastic/beats/v7/libbeat/processors"
25-
"github.com/elastic/beats/v7/libbeat/processors/add_formatted_index"
2623
)
2724

2825
// ConnectorFunc is an adapter for using ordinary functions as Connector.
@@ -48,96 +45,15 @@ func (c *pipelineConnector) Connect(cfg *common.Config) (Outleter, error) {
4845
}
4946

5047
func (c *pipelineConnector) ConnectWith(cfg *common.Config, clientCfg beat.ClientConfig) (Outleter, error) {
51-
config := inputOutletConfig{}
52-
if err := cfg.Unpack(&config); err != nil {
53-
return nil, err
54-
}
55-
56-
procs, err := processorsForConfig(c.parent.beatInfo, config, clientCfg)
57-
if err != nil {
58-
return nil, err
59-
}
60-
61-
setOptional := func(to common.MapStr, key string, value string) {
62-
if value != "" {
63-
to.Put(key, value)
64-
}
65-
}
66-
67-
meta := clientCfg.Processing.Meta.Clone()
68-
fields := clientCfg.Processing.Fields.Clone()
69-
70-
serviceType := config.ServiceType
71-
if serviceType == "" {
72-
serviceType = config.Module
73-
}
74-
75-
setOptional(meta, "pipeline", config.Pipeline)
76-
setOptional(fields, "fileset.name", config.Fileset)
77-
setOptional(fields, "service.type", serviceType)
78-
setOptional(fields, "input.type", config.Type)
79-
if config.Module != "" {
80-
event := common.MapStr{"module": config.Module}
81-
if config.Fileset != "" {
82-
event["dataset"] = config.Module + "." + config.Fileset
83-
}
84-
fields["event"] = event
85-
}
86-
87-
mode := clientCfg.PublishMode
88-
if mode == beat.DefaultGuarantees {
89-
mode = beat.GuaranteedSend
90-
}
91-
9248
// connect with updated configuration
93-
clientCfg.PublishMode = mode
94-
clientCfg.Processing.EventMetadata = config.EventMetadata
95-
clientCfg.Processing.Meta = meta
96-
clientCfg.Processing.Fields = fields
97-
clientCfg.Processing.Processor = procs
98-
clientCfg.Processing.KeepNull = config.KeepNull
9949
client, err := c.pipeline.ConnectWith(clientCfg)
10050
if err != nil {
10151
return nil, err
10252
}
10353

104-
outlet := newOutlet(client, c.parent.wgEvents)
54+
outlet := newOutlet(client)
10555
if c.parent.done != nil {
10656
return CloseOnSignal(outlet, c.parent.done), nil
10757
}
10858
return outlet, nil
10959
}
110-
111-
// processorsForConfig assembles the Processors for a pipelineConnector.
112-
func processorsForConfig(
113-
beatInfo beat.Info, config inputOutletConfig, clientCfg beat.ClientConfig,
114-
) (*processors.Processors, error) {
115-
procs := processors.NewList(nil)
116-
117-
// Processor ordering is important:
118-
// 1. Index configuration
119-
if !config.Index.IsEmpty() {
120-
staticFields := fmtstr.FieldsForBeat(beatInfo.Beat, beatInfo.Version)
121-
timestampFormat, err :=
122-
fmtstr.NewTimestampFormatString(&config.Index, staticFields)
123-
if err != nil {
124-
return nil, err
125-
}
126-
indexProcessor := add_formatted_index.New(timestampFormat)
127-
procs.AddProcessor(indexProcessor)
128-
}
129-
130-
// 2. ClientConfig processors
131-
if lst := clientCfg.Processing.Processor; lst != nil {
132-
procs.AddProcessor(lst)
133-
}
134-
135-
// 3. User processors
136-
userProcessors, err := processors.New(config.Processors)
137-
if err != nil {
138-
return nil, err
139-
}
140-
procs.AddProcessors(*userProcessors)
141-
142-
return procs, nil
143-
}

filebeat/channel/factory.go

Lines changed: 2 additions & 56 deletions
Original file line numberDiff line numberDiff line change
@@ -19,65 +19,17 @@ package channel
1919

2020
import (
2121
"github.com/elastic/beats/v7/libbeat/beat"
22-
"github.com/elastic/beats/v7/libbeat/common"
23-
"github.com/elastic/beats/v7/libbeat/common/fmtstr"
24-
"github.com/elastic/beats/v7/libbeat/processors"
2522
)
2623

2724
type OutletFactory struct {
2825
done <-chan struct{}
29-
30-
eventer beat.ClientEventer
31-
wgEvents eventCounter
32-
beatInfo beat.Info
33-
}
34-
35-
type eventCounter interface {
36-
Add(n int)
37-
Done()
38-
}
39-
40-
// clientEventer adjusts wgEvents if events are dropped during shutdown.
41-
type clientEventer struct {
42-
wgEvents eventCounter
43-
}
44-
45-
// inputOutletConfig defines common input settings
46-
// for the publisher pipeline.
47-
type inputOutletConfig struct {
48-
// event processing
49-
common.EventMetadata `config:",inline"` // Fields and tags to add to events.
50-
Processors processors.PluginConfig `config:"processors"`
51-
KeepNull bool `config:"keep_null"`
52-
53-
// implicit event fields
54-
Type string `config:"type"` // input.type
55-
ServiceType string `config:"service.type"` // service.type
56-
57-
// hidden filebeat modules settings
58-
Module string `config:"_module_name"` // hidden setting
59-
Fileset string `config:"_fileset_name"` // hidden setting
60-
61-
// Output meta data settings
62-
Pipeline string `config:"pipeline"` // ES Ingest pipeline name
63-
Index fmtstr.EventFormatString `config:"index"` // ES output index pattern
6426
}
6527

6628
// NewOutletFactory creates a new outlet factory for
6729
// connecting an input to the publisher pipeline.
68-
func NewOutletFactory(
69-
done <-chan struct{},
70-
wgEvents eventCounter,
71-
beatInfo beat.Info,
72-
) *OutletFactory {
30+
func NewOutletFactory(done <-chan struct{}) *OutletFactory {
7331
o := &OutletFactory{
74-
done: done,
75-
wgEvents: wgEvents,
76-
beatInfo: beatInfo,
77-
}
78-
79-
if wgEvents != nil {
80-
o.eventer = &clientEventer{wgEvents}
32+
done: done,
8133
}
8234

8335
return o
@@ -90,9 +42,3 @@ func NewOutletFactory(
9042
func (f *OutletFactory) Create(p beat.PipelineConnector) Connector {
9143
return &pipelineConnector{parent: f, pipeline: p}
9244
}
93-
94-
func (e *clientEventer) Closing() {}
95-
func (e *clientEventer) Closed() {}
96-
func (e *clientEventer) Published() {}
97-
func (e *clientEventer) FilteredOut(evt beat.Event) {}
98-
func (e *clientEventer) DroppedOnPublish(evt beat.Event) { e.wgEvents.Done() }

0 commit comments

Comments
 (0)