Move processors into new pipeline#4554
Conversation
789983d to
fe75d24
Compare
There was a problem hiding this comment.
Didn't know this works, thought _ is required.
fe75d24 to
bbe6888
Compare
5a8b240 to
526cf2e
Compare
ruflin
left a comment
There was a problem hiding this comment.
LGTM. Few questions:
- There are quite a few changes on the JSON part. I wasn't sure how this is related to the other changes?
- If I understand it right, config wise nothing changes for fields / tags, it's just the internal handling as processor. Also these cannot be configured by a user a processor as they are not registered processors.
There was a problem hiding this comment.
the error has never been checked for. In this PR I'm trying to adjust interfaces, not fix/improve/change behavior in tests or other pieces.
libbeat/processors/processor.go
Outdated
There was a problem hiding this comment.
Could we add an "error" to the event in case a processor did not work so it can be seen in the event, that something went wrong?
There was a problem hiding this comment.
Same here. Old/Current logic does ignore errors and at most prints a debug message. I don't intend to change any processing logic.
There was a problem hiding this comment.
We probably should have that somewhere in the docs
526cf2e to
96eca70
Compare
There's some shared json parsing logic treating
Yep. The pipeline/processor.go defines some internal processors, not globally registered/configurable. |
96eca70 to
aab3cb1
Compare
aab3cb1 to
0a0bb10
Compare
- Update all processors to operate in *beat.Event - Update processors factory to use `*common.Config`, not `common.Config`. `common.Config` works, but go-ucfg interfaces/API assume to operate on pointers (passing by value might break functionality in future?) - Update most processors interface to pass the processors type/context by pointer instead of by value (due to use of interfaces, the processors have been allocated on heap anyways). - Install list of processors in new publisher pipeline. This allows publisher pipeline to account for events being dropped by filters, when reporting ACKs. From beats point of view, there should be no difference between dropped events (by filters) and ACKed events (by outputs). Interface mandates all events being properly ACKed by the publisher pipeline. - Tags, Fields and beat meta data settings are now implicitely converted to Processors as well - -> change ensures, all metadata (local and global) being applied before the actual processors (installed with the pipeline) are executed. Ensures all processors will see the very same events. - introduce `(*processors.Processors).RunBC(common.MapStr) common.MapStr`. Will be removed, once the beats themselves configure publisher pipeline to run processors (filebeat/metricbeat are executing processors on their own). - add `Private` field to `beat.Event` for event-based ACK callbacks. Use `Private` field to store additional event-metadata required for post-processing in the ACK callback (as processor pipeline can change event at will) - Prepare moving beats to new pipeline, but introducing `ConnectX` to BC API, for beats to connect to shared global pipeline (Note: will be removed when BC layer is removed). - Processors are still executed by the go-routine calling `(beat.Client).Publish`. - New Processor Execution Order (C=client based processor, P=pipeline based processor) 1. (P) extract EventMetadataKey fields + tags (to be removed in favor of 4) 2. (P) generalize/normalize event 3. (P) add beats metadata (name, hostname, version) 4. (P) add pipeline fields + tags 5. (C) add client fields + tags 6. (P/C) apply EventMetadataKey fields + tags (to be removed in favor of 4) 7. (C) client processors list 8. (P) pipeline processors list 9. (P) (if publish/debug enabled) log event 10. (P) (if output disabled) dropEvent client processors higher prio
0a0bb10 to
6f47a11
Compare
|
I'm removing the needs_docs label because any outstanding doc work required here is tracked by #4598 |
*common.Config, notcommon.Config.common.Configworks, but go-ucfg interfaces/API assume to operate on pointers (passing by value might break functionality in future?)actual processors (installed with the pipeline) are executed. Ensures all
processors will see the very same events.
(*processors.Processors).RunBC(common.MapStr) common.MapStr. Will be removed, once the beats themselves configure publisher pipeline to run processors (filebeat/metricbeat are executing processors on their own).Privatefield tobeat.Eventfor event-based ACK callbacks. UsePrivatefield to store additional event-metadata required for post-processing in the ACK callback (as processor pipeline can change event at will)ConnectXto BC API, for beats to connect to shared global pipeline (Note: will be removed when BC layer is removed).(beat.Client).Publish.For docs: event processing execution order would be nice to have.