Skip to content

[Pipeline tests] Add the ability to ignore fields with dynamic values #169

@marc-gr

Description

@marc-gr

With pipelines that populate some fields dynamically (e.g. : event.ingested is populated with _ingest.timestamp) pipeline tests will fail, since there is no way with the current config options to ignore this kind of fields.

Would be nice to add some config option that would help in this situations, either by ignoring the field completely, or just by checking its existence and not caring about the value, for example.

An example log line and pipeline to be able to test this is

auditd.log

type=MAC_IPSEC_EVENT msg=audit(1485893834.891:18877201): op=SPD-delete auid=4294967295 ses=4294967295 res=1 src=192.168.2.0 src_prefixlen=24 dst=192.168.0.0 dst_prefixlen=16

pipeline.yml

---
description: Pipeline for parsing Linux auditd logs
processors:
- set:
    field: event.ingested
    value: '{{_ingest.timestamp}}'
- grok:
    field: message
    pattern_definitions:
      AUDIT_TYPE: "type=%{NOTSPACE:auditd.log.record_type}"
      AUDIT_NODE: "node=%{IPORHOST:auditd.log.node} "
      AUDIT_PREFIX: "^(?:%{AUDIT_NODE})?%{AUDIT_TYPE} msg=audit\\(%{NUMBER:auditd.log.epoch}:%{NUMBER:auditd.log.sequence}\\):(%{DATA})?"
      AUDIT_KEY_VALUES: "%{WORD}=%{GREEDYDATA}"
    patterns:
    - "%{AUDIT_PREFIX} %{AUDIT_KEY_VALUES:auditd.log.kv} old auid=%{NUMBER:auditd.log.old_auid}
      new auid=%{NUMBER:auditd.log.new_auid} old ses=%{NUMBER:auditd.log.old_ses}
      new ses=%{NUMBER:auditd.log.new_ses}"
    - "%{AUDIT_PREFIX} %{AUDIT_KEY_VALUES:auditd.log.kv} msg=['\"]%{AUDIT_KEY_VALUES:auditd.log.sub_kv}['\"]"
    - "%{AUDIT_PREFIX} %{AUDIT_KEY_VALUES:auditd.log.kv}"
    - "%{AUDIT_PREFIX}"
    - "%{AUDIT_TYPE} %{AUDIT_KEY_VALUES:auditd.log.kv}"
- kv:
    field: auditd.log.kv
    field_split: "\\s+"
    value_split: "="
    target_field: auditd.log
- kv:
    field: auditd.log.sub_kv
    field_split: "\\s+"
    value_split: "="
    target_field: auditd.log
    ignore_missing: true
- remove:
    field: auditd.log.kv
    ignore_failure: true
- remove:
    field: auditd.log.sub_kv
    ignore_failure: true
- remove:
    field: message
    ignore_failure: true
- date:
    field: auditd.log.epoch
    target_field: "@timestamp"
    formats:
    - UNIX
    ignore_failure: true
- remove:
    field: auditd.log.epoch
    ignore_failure: true
- convert:
    field: auditd.log.sequence
    type: long
    ignore_missing: true
- convert:
    field: auditd.log.lport
    type: long
    ignore_missing: true
- convert:
    field: auditd.log.rport
    type: long
    ignore_missing: true
- script:
    lang: painless
    source: |
        String trimQuotes(def singleQuote, def doubleQuote, def v) {
            if (v.startsWith(singleQuote) || v.startsWith(doubleQuote)) {
                v = v.substring(1, v.length());
            }
            if (v.endsWith(singleQuote) || v.endsWith(doubleQuote)) {
                v = v.substring(0, v.length()-1);
            }
            return v;
        }

        boolean isHexAscii(String v) {
            def len = v.length();

            if (len == 0 || len % 2 != 0) {
                return false;
            }

            for (int i = 0 ; i < len ; i++) {
                if (Character.digit(v.charAt(i), 16) == -1) {
                    return false;
                }
            }
            return true;
        }

        String convertHexToString(String hex) {
            StringBuilder sb = new StringBuilder();
            boolean needed_encoding = false;

            for (int i=0; i < hex.length() - 1; i+=2) {
                int cp = Integer.parseInt(hex.substring(i, (i +2)), 16);
                if (cp < 33 || cp == 34 || cp == 127) {
                    needed_encoding = true;
                }
                if (cp < 32 || cp == 127) {
                    sb.append('^');
                    cp ^= 64;
                }
                sb.append((char)cp);
            }
            if (needed_encoding) {
                return sb.toString();
            }
            return hex;
        }

        def possibleHexKeys = ["exe", "cmd", "data", "path", "comm", "file", "name", "watch", "cwd", "acct", "dir", "vm", "old-chardev", "new-chardev", "old-disk", "new-disk", "old-fs", "new-fs", "old-net", "new-net", "device", "cgroup", "apparmor", "operation", "denied_mask", "info", "profile", "requested_mask", "old-rng", "new-rng", "ocomm", "grp", "new_group", "invalid_context", "sw", "root_dir", "proctitle"];
        def audit = ctx.auditd.get("log");
        Iterator entries = audit.entrySet().iterator();

        while (entries.hasNext()) {
            def e = entries.next();
            def k = e.getKey();
            def v = e.getValue();

            // Remove entries whose value is ?
            if (v == "?" || v == "(null)" || v == "") {
                entries.remove();
                continue;
            }

            // Convert hex values to ASCII.
            if (possibleHexKeys.contains(k) && isHexAscii(v)) {
                v = convertHexToString(v);
                audit.put(k, v);
            }

            // Trim quotes.
            if (v instanceof String) {
                v = trimQuotes(params.single_quote, params.double_quote, v);
                audit.put(k, v);
            }

            // Convert arch.
            if (k == "arch" && v == "c000003e") {
                audit.put(k, "x86_64");
            }
        }
    params:
      single_quote: "'"
      double_quote: "\""
- set:
    field: event.kind
    value: event
- set:
    if: "ctx.auditd.log?.record_type == 'USER_AUTH'"
    field: event.category
    value: authentication
- set:
    if: "ctx.auditd.log?.record_type == 'USER_AUTH'"
    field: event.type
    value: info
- set:
    if: "ctx.auditd.log?.record_type == 'KERN_MODULE'"
    field: event.category
    value: driver
- set:
    if: "ctx.auditd.log?.record_type == 'KERN_MODULE'"
    field: event.type
    value: info
- set:
    if: "ctx.auditd.log?.record_type == 'SOFTWARE_UPDATE'"
    field: event.category
    value: package
- set:
    if: "ctx.auditd.log?.record_type == 'SOFTWARE_UPDATE'"
    field: event.type
    value: info
- set:
    if: "ctx.auditd.log?.record_type == 'SYSTEM_BOOT' || ctx.auditd.log?.record_type == 'SYSTEM_SHUTDOWN'"
    field: event.category
    value: host
- set:
    if: "ctx.auditd.log?.record_type == 'SYSTEM_BOOT' || ctx.auditd.log?.record_type == 'SYSTEM_SHUTDOWN'"
    field: event.type
    value: info
- set:
    if: "ctx.auditd.log?.record_type == 'SYSCALL' && ctx.auditd.log?.syscall == 'execve'"
    field: event.category
    value: process
- set:
    if: "ctx.auditd.log?.record_type == 'SYSCALL' && ctx.auditd.log?.syscall == 'execve'"
    field: event.type
    value: info
- set:
    if: "ctx.auditd.log?.record_type == 'VIRT_CONTROL' || ctx.auditd.log?.record_type == 'VIRT_MACHINE_ID'"
    field: event.category
    value: host
- set:
    if: "ctx.auditd.log?.record_type == 'VIRT_CONTROL' && ctx.auditd.log?.op == 'start'"
    field: event.type
    value: start
- set:
    if: "ctx.auditd.log?.record_type == 'VIRT_CONTROL' && ctx.auditd.log?.op == 'stop'"
    field: event.type
    value: end
- set:
    if: "ctx.auditd.log?.record_type == 'VIRT_CONTROL' && ctx.auditd.log?.op == 'create'"
    field: event.type
    value: creation
- set:
    if: "ctx.auditd.log?.record_type == 'VIRT_CONTROL' && ctx.auditd.log?.op == 'delete'"
    field: event.type
    value: deletion
- set:
    if: "ctx.auditd.log?.record_type == 'VIRT_MACHINE_ID'"
    field: event.type
    value: creation
- set:
    if: "ctx.auditd.log?.record_type == 'VIRT_MACHINE_ID'"
    field: container.name
    value: "{{ auditd.log.vm }}"
    ignore_empty_value: true
- set:
    if: "ctx.auditd.log?.record_type == 'VIRT_MACHINE_ID'"
    field: container.runtime
    value: "{{ auditd.log.virt }}"
    ignore_empty_value: true
- rename:
    ignore_failure: true
    field: auditd.log.arch
    target_field: host.architecture
- rename:
    ignore_failure: true
    field: auditd.log.acct
    target_field: user.name
- rename:
    ignore_failure: true
    field: auditd.log.user
    target_field: user.name
- rename:
    ignore_failure: true
    field: auditd.log.uid
    target_field: user.id
- rename:
    ignore_failure: true
    field: auditd.log.gid
    target_field: user.group.id
- rename:
    ignore_failure: true
    field: auditd.log.agid
    target_field: user.audit.group.id
- rename:
    ignore_failure: true
    field: auditd.log.auid
    target_field: user.audit.id
- rename:
    ignore_failure: true
    field: auditd.log.fsgid
    target_field: user.filesystem.group.id
- rename:
    ignore_failure: true
    field: auditd.log.fsuid
    target_field: user.filesystem.id
- rename:
    ignore_failure: true
    field: auditd.log.egid
    target_field: user.effective.group.id
- rename:
    ignore_failure: true
    field: auditd.log.euid
    target_field: user.effective.id
- rename:
    ignore_failure: true
    field: auditd.log.sgid
    target_field: user.saved.group.id
- rename:
    ignore_failure: true
    field: auditd.log.suid
    target_field: user.saved.id
- rename:
    ignore_failure: true
    field: auditd.log.ogid
    target_field: user.owner.group.id
- rename:
    ignore_failure: true
    field: auditd.log.ouid
    target_field: user.owner.id
- rename:
    ignore_failure: true
    field: auditd.log.comm
    target_field: process.name
- rename:
    ignore_failure: true
    field: auditd.log.exe
    target_field: process.executable
- rename:
    ignore_failure: true
    field: auditd.log.pid
    target_field: process.pid
- rename:
    ignore_failure: true
    field: auditd.log.ppid
    target_field: process.ppid
- convert:
    ignore_missing: true
    field: process.pid
    type: long
- convert:
    ignore_missing: true
    field: process.ppid
    type: long
- rename:
    ignore_failure: true
    field: auditd.log.cmd
    target_field: process.args
- split:
    ignore_failure: true
    field: process.args
    separator: "\\s+"
- rename:
    ignore_failure: true
    field: auditd.log.argc
    target_field: process.args_count
- script:
    if: "ctx?.process?.args != null"
    lang: painless
    source: >-
      if (ctx.process.args instanceof List) {
        ctx.process.args_count = ctx.process.args.length;
      }
- convert:
    ignore_missing: true
    field: process.args_count
    type: long
- rename:
    ignore_failure: true
    field: auditd.log.exit
    target_field: process.exit_code
- convert:
    ignore_missing: true
    field: process.exit_code
    type: long
- rename:
    ignore_missing: true
    field: auditd.log.cwd
    target_field: process.working_directory
- rename:
    ignore_failure: true
    field: auditd.log.terminal
    target_field: user.terminal
- rename:
    ignore_failure: true
    field: auditd.log.msg
    target_field: message
- rename:
    ignore_failure: true
    field: auditd.log.res
    target_field: event.outcome
- rename:
    ignore_failure: true
    field: auditd.log.record_type
    target_field: event.action
- lowercase:
    ignore_failure: true
    field: event.action
- rename:
    ignore_failure: true
    field: auditd.log.src
    target_field: source.address
- rename:
    ignore_failure: true
    field: auditd.log.dst
    target_field: destination.address
- grok:
    field: source.address
    patterns:
    - "^%{IP:source.ip}$"
    ignore_failure: true
- geoip:
    field: source.ip
    target_field: source.geo
    ignore_failure: true
- geoip:
    database_file: GeoLite2-ASN.mmdb
    field: source.ip
    target_field: source.as
    properties:
    - asn
    - organization_name
    ignore_missing: true
- rename:
    field: source.as.asn
    target_field: source.as.number
    ignore_missing: true
- rename:
    field: source.as.organization_name
    target_field: source.as.organization.name
    ignore_missing: true
on_failure:
- set:
    field: error.message
    value: "{{ _ingest.on_failure_message }}"

Metadata

Metadata

Assignees

Labels

No labels
No labels

Type

No type
No fields configured for issues without a type.

Projects

No projects

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions