Skip to content

Move processors into new pipeline#4554

Merged
ruflin merged 1 commit intoelastic:masterfrom
urso:enh/pipeline-processors
Jul 3, 2017
Merged

Move processors into new pipeline#4554
ruflin merged 1 commit intoelastic:masterfrom
urso:enh/pipeline-processors

Conversation

@urso
Copy link
Copy Markdown

@urso urso commented Jun 24, 2017

  • Update all processors to operate in *beat.Event
  • Update processors factory to use *common.Config, not common.Config. common.Config works, but go-ucfg interfaces/API assume to operate on pointers (passing by value might break functionality in future?)
  • Update most processors interface to pass the processors type/context by pointer instead of by value (due to use of interfaces, the processors have been allocated on heap anyways).
  • Install list of processors in new publisher pipeline. This allows publisher pipeline to account for events being dropped by filters, when reporting ACKs. From beats point of view, there should be no difference between dropped events (by filters) and ACKed events (by outputs). Interface mandates all events being properly ACKed by the publisher pipeline.
  • Tags, Fields and beat meta data settings are now implicitely converted to Processors as well
  • -> change ensures, all metadata (local and global) being applied before the
    actual processors (installed with the pipeline) are executed. Ensures all
    processors will see the very same events.
  • introduce (*processors.Processors).RunBC(common.MapStr) common.MapStr. Will be removed, once the beats themselves configure publisher pipeline to run processors (filebeat/metricbeat are executing processors on their own).
  • add Private field to beat.Event for event-based ACK callbacks. Use Private field to store additional event-metadata required for post-processing in the ACK callback (as processor pipeline can change event at will)
  • Prepare moving beats to new pipeline, but introducing ConnectX to BC API, for beats to connect to shared global pipeline (Note: will be removed when BC layer is removed).
  • Processors are still executed by the go-routine calling (beat.Client).Publish.
  • New Processor Execution Order (C=client based processor, P=pipeline based processor)
    1. (P) extract EventMetadataKey fields + tags (to be removed in favor of 4)
    2. (P) generalize/normalize event
    3. (P) add beats metadata (name, hostname, version)
    4. (P) add pipeline fields + tags
    5. (C) add client fields + tags
    6. (P/C) apply EventMetadataKey fields + tags (to be removed in favor of 4)
    7. (C) client processors list
    8. (P) pipeline processors list
    9. (P) (if publish/debug enabled) log event
    10. (P) (if output disabled) dropEvent

For docs: event processing execution order would be nice to have.

@urso urso added the in progress Pull request is currently in progress. label Jun 24, 2017
@urso urso force-pushed the enh/pipeline-processors branch from 789983d to fe75d24 Compare June 26, 2017 12:12
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Didn't know this works, thought _ is required.

@urso urso force-pushed the enh/pipeline-processors branch from fe75d24 to bbe6888 Compare June 30, 2017 12:33
@urso urso added review and removed in progress Pull request is currently in progress. labels Jun 30, 2017
@urso urso changed the title [WIP] Move processors into new pipeline Move processors into new pipeline Jun 30, 2017
@urso urso force-pushed the enh/pipeline-processors branch from 5a8b240 to 526cf2e Compare July 1, 2017 15:36
Copy link
Copy Markdown
Contributor

@ruflin ruflin left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM. Few questions:

  • There are quite a few changes on the JSON part. I wasn't sure how this is related to the other changes?
  • If I understand it right, config wise nothing changes for fields / tags, it's just the internal handling as processor. Also these cannot be configured by a user a processor as they are not registered processors.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No error check?

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the error has never been checked for. In this PR I'm trying to adjust interfaces, not fix/improve/change behavior in tests or other pieces.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we add an "error" to the event in case a processor did not work so it can be seen in the event, that something went wrong?

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same here. Old/Current logic does ignore errors and at most prints a debug message. I don't intend to change any processing logic.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We probably should have that somewhere in the docs

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah. Adding tag.

@urso urso added the needs_docs label Jul 2, 2017
@urso urso force-pushed the enh/pipeline-processors branch from 526cf2e to 96eca70 Compare July 2, 2017 13:26
@urso
Copy link
Copy Markdown
Author

urso commented Jul 2, 2017

There are quite a few changes on the JSON part. I wasn't sure how this is related to the other changes?

There's some shared json parsing logic treating @timestamp and such a little different. In processors we're using beat.Event, but the reader in filebeat is returning common.MapStr. As filebeat is still using the old publisher API, I had to convert to temporary beat.Event type. There is no change in logic.

If I understand it right, config wise nothing changes for fields / tags, it's just the internal handling as processor. Also these cannot be configured by a user a processor as they are not registered processors.

Yep. The pipeline/processor.go defines some internal processors, not globally registered/configurable.

@urso urso force-pushed the enh/pipeline-processors branch from 96eca70 to aab3cb1 Compare July 2, 2017 18:19
@urso urso force-pushed the enh/pipeline-processors branch from aab3cb1 to 0a0bb10 Compare July 2, 2017 22:46
- Update all processors to operate in *beat.Event
- Update processors factory to use `*common.Config`, not `common.Config`.
  `common.Config` works, but go-ucfg interfaces/API assume to operate on
  pointers (passing by value might break functionality in future?)
- Update most processors interface to pass the processors type/context by
  pointer instead of by value (due to use of interfaces, the processors have
  been allocated on heap anyways).
- Install list of processors in new publisher pipeline. This allows publisher
  pipeline to account for events being dropped by filters, when reporting ACKs.
  From beats point of view, there should be no difference between dropped
  events (by filters) and ACKed events (by outputs). Interface mandates all
  events being properly ACKed by the publisher pipeline.
- Tags, Fields and beat meta data settings are now implicitely converted to
  Processors as well
- -> change ensures, all metadata (local and global) being applied before the
  actual processors (installed with the pipeline) are executed. Ensures all
  processors will see the very same events.
- introduce `(*processors.Processors).RunBC(common.MapStr) common.MapStr`. Will
  be removed, once the beats themselves configure publisher pipeline to run
  processors (filebeat/metricbeat are executing processors on their own).
- add `Private` field to `beat.Event` for event-based ACK callbacks. Use
  `Private` field to store additional event-metadata required for post-processing
  in the ACK callback (as processor pipeline can change event at will)
- Prepare moving beats to new pipeline, but introducing `ConnectX` to BC API,
  for beats to connect to shared global pipeline (Note: will be removed when BC
  layer is removed).
- Processors are still executed by the go-routine calling `(beat.Client).Publish`.
- New Processor Execution Order (C=client based processor, P=pipeline based processor)
  1. (P) extract EventMetadataKey fields + tags (to be removed in favor of 4)
  2. (P) generalize/normalize event
  3. (P) add beats metadata (name, hostname, version)
  4. (P) add pipeline fields + tags
  5. (C) add client fields + tags
  6. (P/C) apply EventMetadataKey fields + tags (to be removed in favor of 4)
  7. (C) client processors list
  8. (P) pipeline processors list
  9. (P) (if publish/debug enabled) log event
  10. (P) (if output disabled) dropEvent

client processors higher prio
@urso urso force-pushed the enh/pipeline-processors branch from 0a0bb10 to 6f47a11 Compare July 3, 2017 05:56
@ruflin ruflin merged commit 5ddd685 into elastic:master Jul 3, 2017
@urso urso mentioned this pull request Jul 3, 2017
22 tasks
@dedemorton dedemorton mentioned this pull request Jul 25, 2017
42 tasks
@dedemorton dedemorton mentioned this pull request Dec 14, 2017
37 tasks
@dedemorton
Copy link
Copy Markdown
Contributor

dedemorton commented Jan 22, 2018

I'm removing the needs_docs label because any outstanding doc work required here is tracked by #4598

@urso urso deleted the enh/pipeline-processors branch February 19, 2019 18:47
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants