Skip to content

aggregators should not re-run processors #7993

@ssoroka

Description

@ssoroka

Relevant telegraf.conf:

[[inputs.file]]
  files = ["file.txt"] # contains `data 1`
  data_format = "influx"

[[processors.starlark]]
  # Multiply any float fields by 10
  source = '''
def apply(metric):
    for k, v in metric.fields.items():
        if type(v) == "float":
            metric.fields[k] = v * 10
    return metric
'''

# Keep the aggregate min/max of each metric passing through.
[[aggregators.minmax]]
  period = "30s"
  drop_original = false

Steps to reproduce:

run config like above. Run aggregator with any non-idempotent processor

Expected behavior:

processor runs once, outputting data 10

Actual behavior:

processor runs twice, outputting data 100

Additional info:

This is an old feature of aggregators, and it's due to the fact that aggregators re-run all processors. This was probably figured to be a good idea since you may want to modify the output of aggregators, but it's 1. wasteful, and 2. breaks any processors that can't be applied twice to the same field, eg x * 10 where x=1 gives you 10 after one run, but 100 after two runs; it's not idempotent. Adding an aggregator will break any of these processors.

The solution here would be to convert aggregators to be processors themselves. This will let you order them across both processors and aggregators, and resolve the problem in a very sensible way. Aggregators are a special case of processors, and with the new streaming processor support, they fit perfectly into the processor model.

Metadata

Metadata

Assignees

Labels

area/agentarea/execdIssues related to execd or plugins that would be better suited to be used through execdarea/starlarkbugunexpected problem or unintended behaviorsize/l1 week or more effort

Type

No type

Projects

No projects

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions