Add support for parsers in filestream input#24763
Conversation
|
Pinging @elastic/agent (Team:Agent) |
💚 Build Succeeded
Expand to view the summary
Build stats
Test stats 🧪
Trends 🧪💚 Flaky test reportTests succeeded. Expand to view the summary
Test stats 🧪
|
urso
left a comment
There was a problem hiding this comment.
In general the approach looks good to me. I assume syslog and CRI-O will be added next?
Have you tried JSON in JSON or ndjson -> multiline, as is common in the case of containers?
Most of the time? You see any potential limitation that don't exist in comparison to the logs input? |
JSON parsing is tricky in the The main problem is that the JSON reader only parses the json fields and puts them under |
I see. Not sure how much postprocessing will be required, but maybe we can introduce a generic helper function/method that can convert the Maybe we also need to enhance |
acda1f2 to
b560e7e
Compare
b560e7e to
231d20c
Compare
74e7393 to
3b0d514
Compare
filebeat/input/filestream/input.go
Outdated
There was a problem hiding this comment.
For the complete function body up until here I wonder, if this could be a separate parser we just append to the list of parsers? In that case the beat.Event could be extracted from the message directly.
filebeat/input/filestream/input.go
Outdated
There was a problem hiding this comment.
Why do we need to build and apply the post processors separately? Couldn't we just ensure that the parser correctly modifies the message? Are there fields that must be 'set' before the post processing is run?
There was a problem hiding this comment.
Yes, the fields log.offset, log.file.path are set before the input merges the JSON fields with the existing fields.
filebeat/input/filestream/input.go
Outdated
There was a problem hiding this comment.
I wonder if we can/should inject a processor at the beginning? E.g. if we have filebeat collect the output of another filebeat, shall we really overwrite the existing fields with the new ones we define here, or should we make an attempt to keep the original fields from the original log file intact if they are present?
There was a problem hiding this comment.
Wow, these are quite a few integration tests. Having a few integration tests is indeed important, as we combine quite a few components.
But as the focus is on the parsers, it would be nice to have more unit tests with parsers only, instead of full fledged integration tests that operate on the disk.
I understand that most of these tests are copied from the existing system tests and it is better to keep them as is. But maybe we can create a follow up issue to clean the tests up a little.
libbeat/reader/readjson/json.go
Outdated
There was a problem hiding this comment.
Which value has MessageKey and what do we store in here if we didn't run multiline?
There was a problem hiding this comment.
MessageKey is only needed if multiline is configured before the JSON parser.
|
This pull request is now in conflicts. Could you fix it? 🙏 |
01e84d5 to
dd077b9
Compare
| var config readjson.Config | ||
| cfg := ns.Config() | ||
| cfg.Unpack(&config) | ||
| pp = append(pp, readjson.NewJSONPostProcessor(&config)) |
There was a problem hiding this comment.
what happens to post processing if I have multiple JSON parsers? E.g. json => multiline => json?
There was a problem hiding this comment.
In this case, multiple post processors are added. It means that e.g. if keys_under_root or overwrite_keys is enabled for both json parsers, the last parser "wins".
There was a problem hiding this comment.
I'm more curious what happens to the event if one has keys_under_root disable and one has it enabled.
e.g. let's assume I have the configuration:
parsers:
- ndjson.fields_under_root: true
- ndjson.fields_under_root: false
with this event:
{"log.level": "debug", "message": "{\"id\": 5}"}
then given the parsers I would assume my event will look like:
{
"log.level": "debug",
"json": { "id": 5 }
}
What what will actually be produced is:
{
"log.level": "debug",
"id": 5,
}
Because the post processor removes json from the event, it is not the first one that wins, but the last one... in this case.
|
Looks good for now. But unfortunately we are not off the hook yet. In order to support docker json logs or syslog parsing in the future, we have to try to make the parsers more self-contained without the need for post processing. Maybe we have to 'fork' the current readers and create new implementations for the filestream input. The current readers we have are bound to the logs inputs only, anyways. |
This PR adds support for pasers in the `filestream` input.
Example configuration that aggregates fives lines into a single event and parses the JSON contents:
```yaml
- type: filestream
enabled: true
paths:
- test.log
parsers:
- multiline:
type: count
count_lines: 5
skip_newline: true
- ndjson:
fields_under_root: true
```
(cherry picked from commit 30331bc)
This PR adds support for pasers in the `filestream` input.
Example configuration that aggregates fives lines into a single event and parses the JSON contents:
```yaml
- type: filestream
enabled: true
paths:
- test.log
parsers:
- multiline:
type: count
count_lines: 5
skip_newline: true
- ndjson:
fields_under_root: true
```
(cherry picked from commit 30331bc)
…-github-pr-comment-template * upstream/master: Add support for parsers in filestream input (elastic#24763) Skip flaky test TestFilestreamTruncate (elastic#25218) backport: Add 7.13 branch (elastic#25189) Update decode_json_fields.asciidoc (elastic#25056) [Elastic Agent] Fix status and inspect command to work inside running container (elastic#25204)
…ng-versions-stack * upstream/master: (28 commits) Add support for parsers in filestream input (elastic#24763) Skip flaky test TestFilestreamTruncate (elastic#25218) backport: Add 7.13 branch (elastic#25189) Update decode_json_fields.asciidoc (elastic#25056) [Elastic Agent] Fix status and inspect command to work inside running container (elastic#25204) Check native environment before starting (elastic#25186) Change event.code and winlog.event_id type (elastic#25176) [Ingest Manager] Proxy processes/elastic-agent to stats (elastic#25193) Update mergify backporting to 7.x and 7.13 (elastic#25196) [Heartbeat]: ensure synthetics version co* [Heartbeat]: ensure synthetics version compatability for suites * address review and fix notice * fix lowercase struct * fix version conflict and rebase * update go.* stuff to master * fix notice.txt * move validate inside sourcempatability for suites (elastic#24777) [Filebeat] Ensure Kibana audit `event.category` and `event.type` are still processed as strings. (elastic#25101) Update replace.asciidoc (elastic#25055) Fix nil panic when overwriting metadata (elastic#24741) [Filebeat] Add Malware Bazaar to Threat Intel Module (elastic#24570) Fix k8s svc selectors mapping (elastic#25169) [Ingest Manager] Make agent retry values for bootstraping configurable (elastic#25163) [Metricbeat] Remove elasticsearc.index.created from the SM code (elastic#25113) [Ingest Manager] Keep http and logging config during enroll (elastic#25132) Refactor kubernetes autodiscover to avoid skipping short-living pods (elastic#24742) [libbeat] New decode xml wineventlog processor (elastic#25115) ...
This PR adds support for pasers in the `filestream` input.
Example configuration that aggregates fives lines into a single event and parses the JSON contents:
```yaml
- type: filestream
enabled: true
paths:
- test.log
parsers:
- multiline:
type: count
count_lines: 5
skip_newline: true
- ndjson:
fields_under_root: true
```
(cherry picked from commit 30331bc)
This PR adds support for pasers in the `filestream` input.
Example configuration that aggregates fives lines into a single event and parses the JSON contents:
```yaml
- type: filestream
enabled: true
paths:
- test.log
parsers:
- multiline:
type: count
count_lines: 5
skip_newline: true
- ndjson:
fields_under_root: true
```
(cherry picked from commit 30331bc)
This PR adds support for pasers in the
filestreaminput.Example configuration that aggregates fives lines into a single event and parses the JSON contents: