PUT _ingest/pipeline/logs-routing
{
"processors": [
{
"pipeline": {
"name": "logs-ecs-json",
"if": "def message = ctx.message; return message != null && message.startsWith('{') && message.endsWith('}') && message.contains('\"@timestamp\"') && message.contains('\"ecs') && message.contains('version\"')"
}
},
{
"script": {
"source": "ctx.data_stream.dataset = /[\\/*?\"<>|, #:-]/.matcher(ctx.data_stream.dataset).replaceAll('_')",
"if": "ctx.data_stream?.dataset != null"
}
},
{
"script": {
"source": "ctx.data_stream.namespace = /[\\/*?\"<>|, #:]/.matcher(ctx.data_stream.namespace).replaceAll('_')",
"if": "ctx.data_stream?.namespace != null"
}
},
{
"set": {
"field": "data_stream.type",
"value": "logs",
"override": false
}
},
{
"set": {
"field": "data_stream.dataset",
"value": "generic",
"override": false
}
},
{
"set": {
"field": "data_stream.namespace",
"value": "default",
"override": false
}
},
{
"set": {
"field": "event.dataset",
"copy_from": "data_stream.dataset",
"override": true
}
},
{
"set": {
"field": "_index",
"value": "logs-{{{data_stream.dataset}}}-{{{data_stream.namespace}}}"
}
}
]
}
PUT _ingest/pipeline/logs-ecs-json
{
"processors": [
{
"rename": {
"field": "message",
"target_field": "_ecs_json_message",
"ignore_missing": true
}
},
{
"json": {
"field": "_ecs_json_message",
"add_to_root": true,
"add_to_root_conflict_strategy": "merge",
"allow_duplicate_keys": true,
"if": "ctx.containsKey('_ecs_json_message')",
"on_failure": [
{
"rename": {
"field": "_ecs_json_message",
"target_field": "message",
"ignore_missing": true
}
},
{
"set": {
"field": "error.message",
"value": "Error while parsing JSON",
"override": false
}
}
]
}
},
{
"remove": {
"field": "_ecs_json_message",
"ignore_missing": true
}
},
{
"dot_expander": {
"field": "*",
"override": true
}
},
{
"join": {
"field": "error.stack_trace",
"separator": "\n",
"if": "ctx.error?.stack_trace instanceof Collection"
}
}
]
}
POST _ingest/pipeline/logs-routing/_simulate
{
"docs": [
{
"_source": {
"@timestamp": "2021-06-22T15:55:00.848Z",
"agent": {
"type": "filebeat",
"version": "7.13.2",
"hostname": "Felixs-MBP-2.fritz.box",
"ephemeral_id": "d3706fd0-2f07-465f-8bee-b8fe1a22effe",
"id": "de46bbbc-bb66-465a-96f5-bc69c7197ae6",
"name": "Felixs-MBP-2.fritz.box"
},
"container": {
"id": "spring-projects"
},
"message": "{\"@timestamp\":\"2021-05-19T22:40:52.169Z\", \"ecs.version\": \"1.2.0\", \"data_stream.dataset\": \"spring-petclinic.log\", \"log.level\": \"WARN\", \"message\":\"HikariPool-1 - Thread starvation or clock leap detected (housekeeper delta=4h23s964ms).\", \"service.name\":\"spring-petclinic\",\"process.thread.name\":\"HikariPool-1 housekeeper\",\"log.logger\":\"com.zaxxer.hikari.pool.HikariPool\",\"error.stack_trace\":[\"at foo.bar\", \"\\tat foo.bar\"]}",
"event": {
"dataset": "generic"
},
"elastic_agent": {
"id": "f91381a2-a5db-400d-ac02-682ab7a619d5",
"snapshot": false,
"version": "7.13.2"
},
"log": {
"offset": 0,
"file": {
"path": "/Users/felixbarnsteiner/projects/github/spring-projects/spring-petclinic/logs/app.log.json"
}
},
"input": {
"type": "log"
},
"data_stream": {
"type": "log",
"dataset": "generic",
"namespace": "default"
},
"host": {
"name": "Felixs-MBP-2.fritz.box",
"hostname": "Felixs-MBP-2.fritz.box",
"architecture": "x86_64",
"os": {
"platform": "darwin",
"version": "10.16",
"family": "darwin",
"name": "Mac OS X",
"kernel": "20.4.0",
"build": "20E232",
"type": "macos"
},
"id": "30E396A5-08D7-5AB9-9C39-BD9ED335AC24"
},
"ecs": {
"version": "1.8.0"
}
}
}
]
}
To make the ingest of ECS JSON logs more native and seamless, we want a user experience that does not require custom configuration, JSON parsing, and other processing configuration. Instead, we should detect if a log comes in ECS JSON format and parse it appropriately.
We have identified integrations and Elasticsearch ingest node pipelines to be the best place to automatically detect and parse ECS JSON logs, for the following reason:
I have implemented some improvements for the ES processors that now allow to properly handle ECS JSON logs. I've also created a POC for an ES ingest node pipeline:
Click here to see POC ingest pipeline
Eventually, all log-input-type integrations should leverage this pipeline to automatically handle ECS JSON logs.
If the performance hit of the scripts used in the pipeline turns out to be an issue, we can think about implementing a dedicated processor in Elasticsearch with a pure Java implementation.
We may want to add an option to the integration settings for users to opt out of auto-detection of ECS JSON in case they're worried about the potential additional impact. That can work by adding a
tagto the events and conditionally executing the ECS pipeline.Where to start?
The custom logs integration comes to mind first but it seems there are some dependencies and open questions. As there are already integrations that include pipelines for CloudWatch and Azure logs, these might be a good start.
Open questions