I solved the issue by adjusting the ingest pipeline a bit.
1.) Added gsub to replace \e chars
2.) Added ignore mssing to the second geo ip fileter for source.ip
PUT _ingest/pipeline/filebeat-7.9.0-azure-activitylogs-pipeline
{
"description": "Pipeline for parsing azure activity logs.",
"processors": [
{
"gsub": {
"field": "message",
"pattern": """\e""",
"replacement": ""
}
},
{
"rename": {
"field": "azure",
"target_field": "azure-eventhub",
"ignore_missing": true
}
},
{
"script": {
"source": "ctx.message = ctx.message.replace(params.empty_field_name, '')",
"params": {
"empty_field_name": "\"\":\"\","
},
"ignore_failure": true
}
},
{
"json": {
"field": "message",
"target_field": "azure.activitylogs"
}
},
{
"date": {
"field": "azure.activitylogs.time",
"target_field": "@timestamp",
"ignore_failure": true,
"formats": [
"ISO8601"
]
}
},
{
"remove": {
"field": [
"message",
"azure.activitylogs.time"
],
"ignore_missing": true
}
},
{
"rename": {
"field": "azure.activitylogs.resourceId",
"target_field": "azure.resource_id",
"ignore_missing": true
}
},
{
"rename": {
"field": "azure.activitylogs.callerIpAddress",
"target_field": "source.ip",
"ignore_missing": true
}
},
{
"rename": {
"ignore_missing": true,
"field": "azure.activitylogs.level",
"target_field": "log.level"
}
},
{
"rename": {
"target_field": "event.duration",
"ignore_missing": true,
"field": "azure.activitylogs.durationMs"
}
},
{
"script": {
"lang": "painless",
"source": "if (ctx.event.duration!= null) {ctx.event.duration = ctx.event.duration * params.param_nano;}",
"params": {
"param_nano": 1000000
},
"ignore_failure": true
}
},
{
"rename": {
"field": "azure.activitylogs.location",
"target_field": "geo.name",
"ignore_missing": true
}
},
{
"script": {
"lang": "painless",
"source": "if (ctx?.azure?.activitylogs?.properties?.eventCategory != null) {\n ctx.azure.activitylogs.event_category = ctx.azure.activitylogs.properties.eventCategory;\n} else if (ctx?.azure?.activitylogs?.properties?.policies != null) {\n ctx.azure.activitylogs.event_category = 'Policy';\n} else {\n ctx.azure.activitylogs.event_category = 'Administrative';\n}",
"ignore_failure": true
}
},
{
"rename": {
"field": "azure.activitylogs.resultType",
"target_field": "azure.activitylogs.result_type",
"ignore_missing": true
}
},
{
"convert": {
"field": "azure.activitylogs.result_type",
"target_field": "event.outcome",
"type": "string",
"if": "ctx?.azure?.activitylogs?.resultType != null && ctx.azure.activitylogs.resultType instanceof String && (ctx.azure.activitylogs.resultType.toLowerCase() == 'success' || ctx.azure.activitylogs.resultType.toLowerCase() == 'failure')"
}
},
{
"rename": {
"target_field": "azure.activitylogs.operation_name",
"ignore_missing": true,
"field": "azure.activitylogs.operationName"
}
},
{
"convert": {
"ignore_missing": true,
"field": "azure.activitylogs.operation_name",
"target_field": "event.action",
"type": "string"
}
},
{
"rename": {
"field": "azure.activitylogs.resultSignature",
"target_field": "azure.activitylogs.result_signature",
"ignore_missing": true
}
},
{
"rename": {
"ignore_missing": true,
"field": "azure.activitylogs.identity.authorization.evidence.roleAssignmentScope",
"target_field": "azure.activitylogs.identity.authorization.evidence.role_assignment_scope"
}
},
{
"rename": {
"field": "azure.activitylogs.identity.authorization.evidence.roleDefinitionId",
"target_field": "azure.activitylogs.identity.authorization.evidence.role_definition_id",
"ignore_missing": true
}
},
{
"rename": {
"field": "azure.activitylogs.identity.authorization.evidence.roleAssignmentId",
"target_field": "azure.activitylogs.identity.authorization.evidence.role_assignment_id",
"ignore_missing": true
}
},
{
"rename": {
"field": "azure.activitylogs.identity.authorization.evidence.principalId",
"target_field": "azure.activitylogs.identity.authorization.evidence.principal_id",
"ignore_missing": true
}
},
{
"rename": {
"target_field": "azure.activitylogs.identity.authorization.evidence.principal_type",
"ignore_missing": true,
"field": "azure.activitylogs.identity.authorization.evidence.principalType"
}
},
{
"rename": {
"ignore_missing": true,
"field": "azure.activitylogs.correlationId",
"target_field": "azure.correlation_id"
}
},
{
"rename": {
"target_field": "azure.activitylogs.properties.service_request_id",
"ignore_missing": true,
"field": "azure.activitylogs.properties.serviceRequestId"
}
},
{
"rename": {
"target_field": "message",
"ignore_missing": true,
"field": "azure.activitylogs.properties.statusMessage"
}
},
{
"rename": {
"ignore_missing": true,
"field": "azure.activitylogs.properties.statusCode",
"target_field": "azure.activitylogs.properties.status_code"
}
},
{
"geoip": {
"field": "source.ip",
"target_field": "geo",
"ignore_missing": true
}
},
{
"rename": {
"target_field": "azure.activitylogs.identity.claims_initiated_by_user.fullname",
"ignore_missing": true,
"field": "azure.activitylogs.identity.claims.name"
}
},
{
"script": {
"lang": "painless",
"source": "if (ctx.azure.activitylogs.identity.claims['http://schemas.xmlsoap.org/ws/2005/05/identity/claims/surname'] != null) {\n ctx.azure.activitylogs.identity.claims_initiated_by_user.surname = ctx.azure.activitylogs.identity.claims['http://schemas.xmlsoap.org/ws/2005/05/identity/claims/surname'];\n}",
"ignore_failure": true
}
},
{
"script": {
"lang": "painless",
"source": "if (ctx.azure.activitylogs.identity.claims['http://schemas.xmlsoap.org/ws/2005/05/identity/claims/name'] != null) {\n ctx.azure.activitylogs.identity.claims_initiated_by_user.name = ctx.azure.activitylogs.identity.claims['http://schemas.xmlsoap.org/ws/2005/05/identity/claims/name'];\n}",
"ignore_failure": true
}
},
{
"script": {
"lang": "painless",
"source": "if (ctx.azure.activitylogs.identity.claims['http://schemas.xmlsoap.org/ws/2005/05/identity/claims/givenname'] != null) {\n ctx.azure.activitylogs.identity.claims_initiated_by_user.givenname = ctx.azure.activitylogs.identity.claims['http://schemas.xmlsoap.org/ws/2005/05/identity/claims/givenname'];\n}",
"ignore_failure": true
}
},
{
"set": {
"if": "ctx.azure.activitylogs.identity!= null && ctx.azure.activitylogs.identity.claims_initiated_by_user != null && ctx.azure.activitylogs.identity.claims_initiated_by_user.name != null",
"field": "azure.activitylogs.identity.claims_initiated_by_user.schema",
"value": "http://schemas.xmlsoap.org/ws/2005/05/identity/claims",
"ignore_failure": true
}
},
{
"script": {
"lang": "painless",
"source": "if (ctx.azure.activitylogs.identity.claims != null) {\n ctx.temp_claims = new HashMap();\n for (String key : ctx.azure.activitylogs.identity.claims.keySet()) {\n ctx.temp_claims[key.replace('.', '_')] = ctx.azure.activitylogs.identity.claims.get(key);\n }\n ctx.azure.activitylogs.identity.claims = ctx.temp_claims; ctx.remove('temp_claims');\n}",
"ignore_failure": true
}
},
{
"script": {
"lang": "painless",
"ignore_failure": true,
"params": {
"write": {
"type": [
"change"
]
},
"read": {
"type": [
"access"
]
},
"delete": {
"type": [
"deletion"
]
},
"action": {
"type": [
"change"
]
}
},
"source": "if (ctx?.azure?.activitylogs?.category == null) {\n return;\n} def hm = new HashMap(params.get(ctx.azure.activitylogs.category.toLowerCase())); hm.forEach((k, v) -> ctx.event[k] = v);"
}
},
{
"geoip": {
"field": "source.ip",
"target_field": "source.geo",
"ignore_missing": true
}
},
{
"geoip": {
"database_file": "GeoLite2-ASN.mmdb",
"field": "source.ip",
"target_field": "source.as",
"properties": [
"asn",
"organization_name"
],
"ignore_missing": true
}
},
{
"rename": {
"target_field": "source.as.number",
"ignore_missing": true,
"field": "source.as.asn"
}
},
{
"rename": {
"field": "source.as.organization_name",
"target_field": "source.as.organization.name",
"ignore_missing": true
}
},
{
"grok": {
"ignore_missing": true,
"field": "azure.activitylogs.identity.claims_initiated_by_user.name",
"patterns": [
"%{USERNAME:user.name}@%{HOSTNAME:user.domain}"
]
}
},
{
"convert": {
"field": "azure.activitylogs.identity.claims_initiated_by_user.fullname",
"target_field": "user.full_name",
"type": "string",
"ignore_missing": true
}
},
{
"set": {
"value": "event",
"field": "event.kind"
}
},
{
"pipeline": {
"name": "filebeat-7.9.0-azure-activitylogs-azure-shared-pipeline"
}
}
],
"on_failure": [
{
"set": {
"field": "error.message",
"value": "{{ _ingest.on_failure_message }}"
}
}
]
}
For confirmed bugs, please report:
1.) Collect azure activitylogs with the filebeat azure module
2.) There are some edge cases that result into failures for the ingest pipeline.
2.a) In my example I had \e characters in the log file which resulted into the error
"Illegal unquoted character ((CTRL-CHAR, code 27)): has to be escaped using backslash to be included in string value\n at [Source: (org.elasticsearch.common.bytes.AbstractBytesReference$MarkSupportingStreamInputWrapper); line: 1, column: 2848]"
2.b) Also the source.ip field is not always filled
I solved the issue by adjusting the ingest pipeline a bit.
1.) Added gsub to replace \e chars
2.) Added ignore mssing to the second geo ip fileter for source.ip
This is the working version of the pipeline: