Skip to content

[Filebeat Azure Module] Ingest pipelines fails when there is an escape character in the logs #20797

@felix-lessoer

Description

@felix-lessoer

For confirmed bugs, please report:

  • Version: 7.9.0
  • Operating System: any
  • Discuss Forum URL:
  • Steps to Reproduce:

1.) Collect azure activitylogs with the filebeat azure module
2.) There are some edge cases that result into failures for the ingest pipeline.
2.a) In my example I had \e characters in the log file which resulted into the error
"Illegal unquoted character ((CTRL-CHAR, code 27)): has to be escaped using backslash to be included in string value\n at [Source: (org.elasticsearch.common.bytes.AbstractBytesReference$MarkSupportingStreamInputWrapper); line: 1, column: 2848]"

2.b) Also the source.ip field is not always filled

I solved the issue by adjusting the ingest pipeline a bit.
1.) Added gsub to replace \e chars
2.) Added ignore mssing to the second geo ip fileter for source.ip

This is the working version of the pipeline:

PUT _ingest/pipeline/filebeat-7.9.0-azure-activitylogs-pipeline
{
  "description": "Pipeline for parsing azure activity logs.",
  "processors": [
    {
      "gsub": {
        "field": "message",
        "pattern": """\e""",
        "replacement": ""
      }
    },
    {
      "rename": {
        "field": "azure",
        "target_field": "azure-eventhub",
        "ignore_missing": true
      }
    },
    {
      "script": {
        "source": "ctx.message = ctx.message.replace(params.empty_field_name, '')",
        "params": {
          "empty_field_name": "\"\":\"\","
        },
        "ignore_failure": true
      }
    },
    {
      "json": {
        "field": "message",
        "target_field": "azure.activitylogs"
      }
    },
    {
      "date": {
        "field": "azure.activitylogs.time",
        "target_field": "@timestamp",
        "ignore_failure": true,
        "formats": [
          "ISO8601"
        ]
      }
    },
    {
      "remove": {
        "field": [
          "message",
          "azure.activitylogs.time"
        ],
        "ignore_missing": true
      }
    },
    {
      "rename": {
        "field": "azure.activitylogs.resourceId",
        "target_field": "azure.resource_id",
        "ignore_missing": true
      }
    },
    {
      "rename": {
        "field": "azure.activitylogs.callerIpAddress",
        "target_field": "source.ip",
        "ignore_missing": true
      }
    },
    {
      "rename": {
        "ignore_missing": true,
        "field": "azure.activitylogs.level",
        "target_field": "log.level"
      }
    },
    {
      "rename": {
        "target_field": "event.duration",
        "ignore_missing": true,
        "field": "azure.activitylogs.durationMs"
      }
    },
    {
      "script": {
        "lang": "painless",
        "source": "if (ctx.event.duration!= null) {ctx.event.duration = ctx.event.duration * params.param_nano;}",
        "params": {
          "param_nano": 1000000
        },
        "ignore_failure": true
      }
    },
    {
      "rename": {
        "field": "azure.activitylogs.location",
        "target_field": "geo.name",
        "ignore_missing": true
      }
    },
    {
      "script": {
        "lang": "painless",
        "source": "if (ctx?.azure?.activitylogs?.properties?.eventCategory != null) {\n  ctx.azure.activitylogs.event_category = ctx.azure.activitylogs.properties.eventCategory;\n} else if (ctx?.azure?.activitylogs?.properties?.policies != null)  {\n  ctx.azure.activitylogs.event_category = 'Policy';\n} else {\n  ctx.azure.activitylogs.event_category = 'Administrative';\n}",
        "ignore_failure": true
      }
    },
    {
      "rename": {
        "field": "azure.activitylogs.resultType",
        "target_field": "azure.activitylogs.result_type",
        "ignore_missing": true
      }
    },
    {
      "convert": {
        "field": "azure.activitylogs.result_type",
        "target_field": "event.outcome",
        "type": "string",
        "if": "ctx?.azure?.activitylogs?.resultType != null && ctx.azure.activitylogs.resultType instanceof String && (ctx.azure.activitylogs.resultType.toLowerCase() == 'success' || ctx.azure.activitylogs.resultType.toLowerCase() == 'failure')"
      }
    },
    {
      "rename": {
        "target_field": "azure.activitylogs.operation_name",
        "ignore_missing": true,
        "field": "azure.activitylogs.operationName"
      }
    },
    {
      "convert": {
        "ignore_missing": true,
        "field": "azure.activitylogs.operation_name",
        "target_field": "event.action",
        "type": "string"
      }
    },
    {
      "rename": {
        "field": "azure.activitylogs.resultSignature",
        "target_field": "azure.activitylogs.result_signature",
        "ignore_missing": true
      }
    },
    {
      "rename": {
        "ignore_missing": true,
        "field": "azure.activitylogs.identity.authorization.evidence.roleAssignmentScope",
        "target_field": "azure.activitylogs.identity.authorization.evidence.role_assignment_scope"
      }
    },
    {
      "rename": {
        "field": "azure.activitylogs.identity.authorization.evidence.roleDefinitionId",
        "target_field": "azure.activitylogs.identity.authorization.evidence.role_definition_id",
        "ignore_missing": true
      }
    },
    {
      "rename": {
        "field": "azure.activitylogs.identity.authorization.evidence.roleAssignmentId",
        "target_field": "azure.activitylogs.identity.authorization.evidence.role_assignment_id",
        "ignore_missing": true
      }
    },
    {
      "rename": {
        "field": "azure.activitylogs.identity.authorization.evidence.principalId",
        "target_field": "azure.activitylogs.identity.authorization.evidence.principal_id",
        "ignore_missing": true
      }
    },
    {
      "rename": {
        "target_field": "azure.activitylogs.identity.authorization.evidence.principal_type",
        "ignore_missing": true,
        "field": "azure.activitylogs.identity.authorization.evidence.principalType"
      }
    },
    {
      "rename": {
        "ignore_missing": true,
        "field": "azure.activitylogs.correlationId",
        "target_field": "azure.correlation_id"
      }
    },
    {
      "rename": {
        "target_field": "azure.activitylogs.properties.service_request_id",
        "ignore_missing": true,
        "field": "azure.activitylogs.properties.serviceRequestId"
      }
    },
    {
      "rename": {
        "target_field": "message",
        "ignore_missing": true,
        "field": "azure.activitylogs.properties.statusMessage"
      }
    },
    {
      "rename": {
        "ignore_missing": true,
        "field": "azure.activitylogs.properties.statusCode",
        "target_field": "azure.activitylogs.properties.status_code"
      }
    },
    {
      "geoip": {
        "field": "source.ip",
        "target_field": "geo",
        "ignore_missing": true
      }
    },
    {
      "rename": {
        "target_field": "azure.activitylogs.identity.claims_initiated_by_user.fullname",
        "ignore_missing": true,
        "field": "azure.activitylogs.identity.claims.name"
      }
    },
    {
      "script": {
        "lang": "painless",
        "source": "if (ctx.azure.activitylogs.identity.claims['http://schemas.xmlsoap.org/ws/2005/05/identity/claims/surname'] != null) {\n  ctx.azure.activitylogs.identity.claims_initiated_by_user.surname = ctx.azure.activitylogs.identity.claims['http://schemas.xmlsoap.org/ws/2005/05/identity/claims/surname'];\n}",
        "ignore_failure": true
      }
    },
    {
      "script": {
        "lang": "painless",
        "source": "if (ctx.azure.activitylogs.identity.claims['http://schemas.xmlsoap.org/ws/2005/05/identity/claims/name'] != null) {\n  ctx.azure.activitylogs.identity.claims_initiated_by_user.name = ctx.azure.activitylogs.identity.claims['http://schemas.xmlsoap.org/ws/2005/05/identity/claims/name'];\n}",
        "ignore_failure": true
      }
    },
    {
      "script": {
        "lang": "painless",
        "source": "if (ctx.azure.activitylogs.identity.claims['http://schemas.xmlsoap.org/ws/2005/05/identity/claims/givenname'] != null) {\n  ctx.azure.activitylogs.identity.claims_initiated_by_user.givenname = ctx.azure.activitylogs.identity.claims['http://schemas.xmlsoap.org/ws/2005/05/identity/claims/givenname'];\n}",
        "ignore_failure": true
      }
    },
    {
      "set": {
        "if": "ctx.azure.activitylogs.identity!= null && ctx.azure.activitylogs.identity.claims_initiated_by_user != null && ctx.azure.activitylogs.identity.claims_initiated_by_user.name != null",
        "field": "azure.activitylogs.identity.claims_initiated_by_user.schema",
        "value": "http://schemas.xmlsoap.org/ws/2005/05/identity/claims",
        "ignore_failure": true
      }
    },
    {
      "script": {
        "lang": "painless",
        "source": "if (ctx.azure.activitylogs.identity.claims != null) {\n  ctx.temp_claims = new HashMap();\n  for (String key : ctx.azure.activitylogs.identity.claims.keySet()) {\n    ctx.temp_claims[key.replace('.', '_')] = ctx.azure.activitylogs.identity.claims.get(key);\n  }\n  ctx.azure.activitylogs.identity.claims = ctx.temp_claims; ctx.remove('temp_claims');\n}",
        "ignore_failure": true
      }
    },
    {
      "script": {
        "lang": "painless",
        "ignore_failure": true,
        "params": {
          "write": {
            "type": [
              "change"
            ]
          },
          "read": {
            "type": [
              "access"
            ]
          },
          "delete": {
            "type": [
              "deletion"
            ]
          },
          "action": {
            "type": [
              "change"
            ]
          }
        },
        "source": "if (ctx?.azure?.activitylogs?.category == null) {\n  return;\n} def hm = new HashMap(params.get(ctx.azure.activitylogs.category.toLowerCase())); hm.forEach((k, v) -> ctx.event[k] = v);"
      }
    },
    {
      "geoip": {
        "field": "source.ip",
        "target_field": "source.geo",
        "ignore_missing": true
      }
    },
    {
      "geoip": {
        "database_file": "GeoLite2-ASN.mmdb",
        "field": "source.ip",
        "target_field": "source.as",
        "properties": [
          "asn",
          "organization_name"
        ],
        "ignore_missing": true
      }
    },
    {
      "rename": {
        "target_field": "source.as.number",
        "ignore_missing": true,
        "field": "source.as.asn"
      }
    },
    {
      "rename": {
        "field": "source.as.organization_name",
        "target_field": "source.as.organization.name",
        "ignore_missing": true
      }
    },
    {
      "grok": {
        "ignore_missing": true,
        "field": "azure.activitylogs.identity.claims_initiated_by_user.name",
        "patterns": [
          "%{USERNAME:user.name}@%{HOSTNAME:user.domain}"
        ]
      }
    },
    {
      "convert": {
        "field": "azure.activitylogs.identity.claims_initiated_by_user.fullname",
        "target_field": "user.full_name",
        "type": "string",
        "ignore_missing": true
      }
    },
    {
      "set": {
        "value": "event",
        "field": "event.kind"
      }
    },
    {
      "pipeline": {
        "name": "filebeat-7.9.0-azure-activitylogs-azure-shared-pipeline"
      }
    }
  ],
  "on_failure": [
    {
      "set": {
        "field": "error.message",
        "value": "{{ _ingest.on_failure_message }}"
      }
    }
  ]
}

Metadata

Metadata

Assignees

Labels

Team:PlatformsLabel for the Integrations - Platforms team

Type

No type
No fields configured for issues without a type.

Projects

No projects

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions