Update libbeat publisher pipeline#4492
Conversation
aa3995b to
a6868b0
Compare
libbeat/outputs/codec/json/event.go
Outdated
88789e6 to
cfe657d
Compare
There was a problem hiding this comment.
Have had to skip the test, as it has been very unreliable and I didn't manage to reproduce it yet for investigation. Plus, this feature as is will be replaced with wait shutdown support directly provided by the new publisher pipeline.
There was a problem hiding this comment.
can you perhaps open a follow up github issue to track these things?
There was a problem hiding this comment.
It's the only test I have had to skip. But will create meta-ticket for further pipeline work.
libbeat/publisher/beat/bc.go
Outdated
|
For Reviewer notes:
|
|
@urso I hit an error when using an output config section like this: The error is: Expected? |
|
@tsg this is to be excepted. From notes:
With this PR one can have only one output configured. The error message is generated by beats config using The fix for using |
ruflin
left a comment
There was a problem hiding this comment.
I skimmed through and left some remarks. In general LGTM and we can move forward on this.
There was a problem hiding this comment.
can you perhaps open a follow up github issue to track these things?
libbeat/logp/logger.go
Outdated
There was a problem hiding this comment.
why did you touch the logger in this PR?
There was a problem hiding this comment.
some parts of the pipeline allow for a logger to be passed by interface/type. This allows the more 'expensive' tests to capture the log output in the test context -> correctly group log messages with test output.
There was a problem hiding this comment.
The old internal Logger type is renamed to logger. We now export a *Logger type with configurable selector.
There was a problem hiding this comment.
I assume I will find all the defaults below somewhere else :-)
There was a problem hiding this comment.
This is the monitoring reporter, not the actual Elasticsearch event output.
The backoff parameters have become configurable, as one has(will have to) configure the backoff strategy when creating the outputs.Group. This is due this PR shifting some responsibilities from the outputs to the publisher pipeline itself (e.g. error handling, backoff, retry). Some of the shifting will be necessary, when introducing dynamic output reloading, as the pipeline must transfer active events from old to new outputs.
libbeat/monitoring/report/report.go
Outdated
There was a problem hiding this comment.
Nice. I see the possibility coming of not having an output initially ;-)
libbeat/outputs/backoff.go
Outdated
There was a problem hiding this comment.
Reminds me of the filebeat readers implementation ;-)
There was a problem hiding this comment.
yeah. It's kind of a wrapper for NetworkClient. This will be moved closer to the libbeat pipeline itself. Backoff strategy + parameters will become config parameters in outputs.Group.
libbeat/outputs/logstash/logstash.go
Outdated
There was a problem hiding this comment.
Did all this stuff get replaced by new publisher?
There was a problem hiding this comment.
yes. I modified the pipeline and output interfaces to be of one kind only. Outputs always get batches and on input it's of pipeline it's always by one event being pushed. This simplifies/removes some extra logic in outputs and pipeline itself for dealing with bulk and non-bulk event handling.
There was a problem hiding this comment.
seems we have this code in different places
There was a problem hiding this comment.
Different places? The bc client is adapting the event, before pushing to the new pipeline. Maybe in some unit tests you mean?
There was a problem hiding this comment.
originally I have had my own logger interface, to correctly capture log output within a test context (so log output and test name do make some more sense if something goes wrong). But when adopting pipeline to libbeat I opted for using logp. This enforced the introduction of withLogOutput, capturing stderr into t.Log, so I can still make sense of error logs mixed with pipeline processing logs.
There was a problem hiding this comment.
we could generate this in the future ... but as we are not going to add outputs ...
libbeat/tests/system/beat/beat.py
Outdated
There was a problem hiding this comment.
?
I'm removing @metadata from events. All outputs but Elasticsearch print the events in JSON-format, as expected by logstash. including @metadata. So our beats->LS->ES configs are valid, even if kafka or redis is used. With @metadata being somewhat private, we never documented them. That is, I have to remove the fields, as a number of system tests do check all fields in an event are documented.
3da60a6 to
7e6cbc8
Compare
|
@urso |
|
@tsg looking into it. Interestingly test is completely unrelated to any changes in this PR + passes for me. Did restart travis job. |
7e6cbc8 to
80f5369
Compare
|
@tsg did fix test, let's wait for travis. |
fa87690 to
76176d7
Compare
This change marks the beginning of the libbeat event publisher pipeline
refactoring.
- central to the publisher pipeline is the broker:
- broker implementation can be configured when constructing the pipeline
- common broker implementation tests in `brokertest` package
- broker features:
- Fully in control of all published events. In comparison to old publisher
pipeline with many batches in flight, the broker now configures/controls
the total number of events stored in the publisher pipeline. Only after
ACKs from outputs, will new space become available.
- broker returns ACKS in correct order to publisher
- broker batches up multiple ACKs
- producer can only send one event at a time to the broker (push)
- consumer can only receive batches of events from broker (pull)
- producer can cancel(remove) active events not yet pulled by a consumer
- broker/output related interfaces defined in `publisher` package
- pipeline/client interfaces for use by beats currently defined in
`publisher/beat` package
- event structure has been changed to be more compatible with Logstash (See
beat.Event): Beats can send metadata to libbeat outputs (e.g. pipeline) and
logstash by using the `Event.Meta` field. Event fields will be stored on
`Event.Fields`. Event fields are normalized (for use with processors) and
serialized using.
- The old publishers publish API is moved to libbeat/publisher/bc/publisher for
now:
- move to new sub-package to fight of circular imports
- package implements old pipeline API on top of new pipeline
- Filters/Processors are still executed before pushing events to the new
pipeline
- New API:
- beats client requirements are configured via `beat.ClientConfig`:
- register async ACK callbacks (currently callbacks will not be triggered
after `Client.Close`)
- configurable sending guarantees (must match ACK support)
- "wait on close", for beats clients to wait for pending events to be ACKed
(only if ACK is configured)
- pipeline also supports "wait on close", waiting for pending events
(independent of ACK configurations). Can be used by any beat, to wait on
shutdown for published events to be actually send
Event Structure:
----------------
The event structure has been changed a little:
```
type Event struct {
Timestamp time.Time
Meta common.MapStr
Fields common.MapStr
}
```
- We always require the timestamps.
- Meta contains additional meta data (hints?) a beat can forward to the
outputs. For example `pipeline` or `index` settings for the Elasticsearch
output.
- If output is not Elasticsearch, a `@metadata` field will always be written to
the json document. This way Logstash can take advantage of all `@metadata`,
even if the event has been send via kafka or redis.
The new output plugin factory is defined as:
```
type Factory func(beat common.BeatInfo, cfg *common.Config) (Group, error)
```
The package libbeat/output/mode is being removed + all it's functionality is
moved into a single implementation in the publisher pipeline supporting
sync/async clients with failover and load-balancing. In the future dynamic
output discovery might be added as well. This change requires output.Group to
return some common settings for an active output, to configure the pipeline:
```
// Group configures and combines multiple clients into load-balanced group of
// clients being managed by the publisher pipeline.
type Group struct {
Clients []Client
BatchSize int
Retry int
}
```
Moving functionality from the outputs to the publisher pipeline restricts beats
from having one output type configured only.
All client instances configured will participate in load-balancing being driven
by the publisher pipeline. This removes some intermediate workers used for
forwarding batches. Future changes to groups include:
outputs always operate in batches by implement only the `Publish` method:
```
// Publish sends events to the clients sink. A client must synchronously or
// asynchronously ACK the given batch, once all events have been processed.
// Using Retry/Cancelled a client can return a batch of unprocessed events to
// the publisher pipeline. The publisher pipeline (if configured by the output
// factory) will take care of retrying/dropping events.
Publish(publisher.Batch) error
```
With:
```
// Batch is used to pass a batch of events to the outputs and asynchronously listening
// for signals from these outpts. After a batch is processed (completed or
// errors), one of the signal methods must be called.
type Batch interface {
Events() []Event
// signals
ACK()
Drop()
Retry()
RetryEvents(events []Event)
Cancelled()
CancelledEvents(events []Event)
}
```
The batch interface combines `events + signaling` into one common interface.
The main difference between sync/async clients is, when `batch.ACK` is called.
Batches/Events can be processed out of order. The publisher pipelining doing
the batching and load-balancing guarantees ACKs being returned to the beat in
order + implements upper bound. Once publisher pipeline is 'full', it will
block, waiting for ACKs from outputs.
The logic for dropping events on retry and guaranteed sending is moved to the
publisher pipeline as well. Outputs are concerned with publishing and signaling
ACK or Retry only.
Write the packetbeat log output to the test output, in case of packetbeat exit code does not match the expected exit code
79d35f0 to
f9482d7
Compare
"Producer cancel" is a feature that allows closing queue producers to also cancel any pending events created by that producer that have not yet been sent to a queue reader. It was introduced as a small part of a [very large refactor](#4492) in 2017, but current code doesn't depend on it for anything. Since this feature adds considerable complexity to the queue API and implementation, this PR removes the feature and associated helpers. This PR should cause no user-visible behavior change.
This PR marks the beginning of the libbeat event publisher pipeline refactoring.
brokertestpackagepublisherpackagepublisher/beatpackageEvent.Metafield. Event fields will be stored onEvent.Fields. Event fields are normalized (for use with processors) and serialized using.beat.ClientConfig:Client.Close)Event Structure:
The event structure has been changed a little:
pipelineorindexsettings for the Elasticsearch output.@metadatafield will always be written to the json document. This way Logstash can take advantage of all@metadata, even if the event has been send via kafka or redis.Output changes
The new output plugin factory is defined as:
The package libbeat/output/mode is being removed + all it's functionality is moved into a single implementation in the publisher pipeline supporting sync/async clients with failover and load-balancing. In the future dynamic output discovery might be added as well. This change requires output.Group to return some common settings for an active output, to configure the pipeline:
Moving functionality from the outputs to the publisher pipeline restricts beats from having one output type configured only.
All client instances configured will participate in load-balancing being driven by the publisher pipeline. This removes some intermediate workers used for forwarding batches. Future changes to groups include:
outputs always operate in batches by implement only the
Publishmethod:With:
The batch interface combines
events + signalinginto one common interface.The main difference between sync/async clients is, when
batch.ACKis called. Batches/Events can be processed out of order. The publisher pipelining doing the batching and load-balancing guarantees ACKs being returned to the beat in order + implements upper bound. Once publisher pipeline is 'full', it will block, waiting for ACKs from outputs.The logic for dropping events on retry and guaranteed sending is moved to the publisher pipeline as well. Outputs are concerned with publishing and signaling ACK or Retry only.
Upcoming
queue_sizeandbulk_queue_sizein favor of configurable broker)beat.Eventinstead ofcommon.MapStr@metadataaccessible from processors?bc/publisherpackagebc/publisherpackageoutputs.Group, so codecs can be applied earlier in pipeline (=> limit events per batch/queue by memory usage)