With pipelines that populate some fields dynamically (e.g. : event.ingested is populated with _ingest.timestamp) pipeline tests will fail, since there is no way with the current config options to ignore this kind of fields.
Would be nice to add some config option that would help in this situations, either by ignoring the field completely, or just by checking its existence and not caring about the value, for example.
An example log line and pipeline to be able to test this is
auditd.log
type=MAC_IPSEC_EVENT msg=audit(1485893834.891:18877201): op=SPD-delete auid=4294967295 ses=4294967295 res=1 src=192.168.2.0 src_prefixlen=24 dst=192.168.0.0 dst_prefixlen=16
pipeline.yml
---
description: Pipeline for parsing Linux auditd logs
processors:
- set:
field: event.ingested
value: '{{_ingest.timestamp}}'
- grok:
field: message
pattern_definitions:
AUDIT_TYPE: "type=%{NOTSPACE:auditd.log.record_type}"
AUDIT_NODE: "node=%{IPORHOST:auditd.log.node} "
AUDIT_PREFIX: "^(?:%{AUDIT_NODE})?%{AUDIT_TYPE} msg=audit\\(%{NUMBER:auditd.log.epoch}:%{NUMBER:auditd.log.sequence}\\):(%{DATA})?"
AUDIT_KEY_VALUES: "%{WORD}=%{GREEDYDATA}"
patterns:
- "%{AUDIT_PREFIX} %{AUDIT_KEY_VALUES:auditd.log.kv} old auid=%{NUMBER:auditd.log.old_auid}
new auid=%{NUMBER:auditd.log.new_auid} old ses=%{NUMBER:auditd.log.old_ses}
new ses=%{NUMBER:auditd.log.new_ses}"
- "%{AUDIT_PREFIX} %{AUDIT_KEY_VALUES:auditd.log.kv} msg=['\"]%{AUDIT_KEY_VALUES:auditd.log.sub_kv}['\"]"
- "%{AUDIT_PREFIX} %{AUDIT_KEY_VALUES:auditd.log.kv}"
- "%{AUDIT_PREFIX}"
- "%{AUDIT_TYPE} %{AUDIT_KEY_VALUES:auditd.log.kv}"
- kv:
field: auditd.log.kv
field_split: "\\s+"
value_split: "="
target_field: auditd.log
- kv:
field: auditd.log.sub_kv
field_split: "\\s+"
value_split: "="
target_field: auditd.log
ignore_missing: true
- remove:
field: auditd.log.kv
ignore_failure: true
- remove:
field: auditd.log.sub_kv
ignore_failure: true
- remove:
field: message
ignore_failure: true
- date:
field: auditd.log.epoch
target_field: "@timestamp"
formats:
- UNIX
ignore_failure: true
- remove:
field: auditd.log.epoch
ignore_failure: true
- convert:
field: auditd.log.sequence
type: long
ignore_missing: true
- convert:
field: auditd.log.lport
type: long
ignore_missing: true
- convert:
field: auditd.log.rport
type: long
ignore_missing: true
- script:
lang: painless
source: |
String trimQuotes(def singleQuote, def doubleQuote, def v) {
if (v.startsWith(singleQuote) || v.startsWith(doubleQuote)) {
v = v.substring(1, v.length());
}
if (v.endsWith(singleQuote) || v.endsWith(doubleQuote)) {
v = v.substring(0, v.length()-1);
}
return v;
}
boolean isHexAscii(String v) {
def len = v.length();
if (len == 0 || len % 2 != 0) {
return false;
}
for (int i = 0 ; i < len ; i++) {
if (Character.digit(v.charAt(i), 16) == -1) {
return false;
}
}
return true;
}
String convertHexToString(String hex) {
StringBuilder sb = new StringBuilder();
boolean needed_encoding = false;
for (int i=0; i < hex.length() - 1; i+=2) {
int cp = Integer.parseInt(hex.substring(i, (i +2)), 16);
if (cp < 33 || cp == 34 || cp == 127) {
needed_encoding = true;
}
if (cp < 32 || cp == 127) {
sb.append('^');
cp ^= 64;
}
sb.append((char)cp);
}
if (needed_encoding) {
return sb.toString();
}
return hex;
}
def possibleHexKeys = ["exe", "cmd", "data", "path", "comm", "file", "name", "watch", "cwd", "acct", "dir", "vm", "old-chardev", "new-chardev", "old-disk", "new-disk", "old-fs", "new-fs", "old-net", "new-net", "device", "cgroup", "apparmor", "operation", "denied_mask", "info", "profile", "requested_mask", "old-rng", "new-rng", "ocomm", "grp", "new_group", "invalid_context", "sw", "root_dir", "proctitle"];
def audit = ctx.auditd.get("log");
Iterator entries = audit.entrySet().iterator();
while (entries.hasNext()) {
def e = entries.next();
def k = e.getKey();
def v = e.getValue();
// Remove entries whose value is ?
if (v == "?" || v == "(null)" || v == "") {
entries.remove();
continue;
}
// Convert hex values to ASCII.
if (possibleHexKeys.contains(k) && isHexAscii(v)) {
v = convertHexToString(v);
audit.put(k, v);
}
// Trim quotes.
if (v instanceof String) {
v = trimQuotes(params.single_quote, params.double_quote, v);
audit.put(k, v);
}
// Convert arch.
if (k == "arch" && v == "c000003e") {
audit.put(k, "x86_64");
}
}
params:
single_quote: "'"
double_quote: "\""
- set:
field: event.kind
value: event
- set:
if: "ctx.auditd.log?.record_type == 'USER_AUTH'"
field: event.category
value: authentication
- set:
if: "ctx.auditd.log?.record_type == 'USER_AUTH'"
field: event.type
value: info
- set:
if: "ctx.auditd.log?.record_type == 'KERN_MODULE'"
field: event.category
value: driver
- set:
if: "ctx.auditd.log?.record_type == 'KERN_MODULE'"
field: event.type
value: info
- set:
if: "ctx.auditd.log?.record_type == 'SOFTWARE_UPDATE'"
field: event.category
value: package
- set:
if: "ctx.auditd.log?.record_type == 'SOFTWARE_UPDATE'"
field: event.type
value: info
- set:
if: "ctx.auditd.log?.record_type == 'SYSTEM_BOOT' || ctx.auditd.log?.record_type == 'SYSTEM_SHUTDOWN'"
field: event.category
value: host
- set:
if: "ctx.auditd.log?.record_type == 'SYSTEM_BOOT' || ctx.auditd.log?.record_type == 'SYSTEM_SHUTDOWN'"
field: event.type
value: info
- set:
if: "ctx.auditd.log?.record_type == 'SYSCALL' && ctx.auditd.log?.syscall == 'execve'"
field: event.category
value: process
- set:
if: "ctx.auditd.log?.record_type == 'SYSCALL' && ctx.auditd.log?.syscall == 'execve'"
field: event.type
value: info
- set:
if: "ctx.auditd.log?.record_type == 'VIRT_CONTROL' || ctx.auditd.log?.record_type == 'VIRT_MACHINE_ID'"
field: event.category
value: host
- set:
if: "ctx.auditd.log?.record_type == 'VIRT_CONTROL' && ctx.auditd.log?.op == 'start'"
field: event.type
value: start
- set:
if: "ctx.auditd.log?.record_type == 'VIRT_CONTROL' && ctx.auditd.log?.op == 'stop'"
field: event.type
value: end
- set:
if: "ctx.auditd.log?.record_type == 'VIRT_CONTROL' && ctx.auditd.log?.op == 'create'"
field: event.type
value: creation
- set:
if: "ctx.auditd.log?.record_type == 'VIRT_CONTROL' && ctx.auditd.log?.op == 'delete'"
field: event.type
value: deletion
- set:
if: "ctx.auditd.log?.record_type == 'VIRT_MACHINE_ID'"
field: event.type
value: creation
- set:
if: "ctx.auditd.log?.record_type == 'VIRT_MACHINE_ID'"
field: container.name
value: "{{ auditd.log.vm }}"
ignore_empty_value: true
- set:
if: "ctx.auditd.log?.record_type == 'VIRT_MACHINE_ID'"
field: container.runtime
value: "{{ auditd.log.virt }}"
ignore_empty_value: true
- rename:
ignore_failure: true
field: auditd.log.arch
target_field: host.architecture
- rename:
ignore_failure: true
field: auditd.log.acct
target_field: user.name
- rename:
ignore_failure: true
field: auditd.log.user
target_field: user.name
- rename:
ignore_failure: true
field: auditd.log.uid
target_field: user.id
- rename:
ignore_failure: true
field: auditd.log.gid
target_field: user.group.id
- rename:
ignore_failure: true
field: auditd.log.agid
target_field: user.audit.group.id
- rename:
ignore_failure: true
field: auditd.log.auid
target_field: user.audit.id
- rename:
ignore_failure: true
field: auditd.log.fsgid
target_field: user.filesystem.group.id
- rename:
ignore_failure: true
field: auditd.log.fsuid
target_field: user.filesystem.id
- rename:
ignore_failure: true
field: auditd.log.egid
target_field: user.effective.group.id
- rename:
ignore_failure: true
field: auditd.log.euid
target_field: user.effective.id
- rename:
ignore_failure: true
field: auditd.log.sgid
target_field: user.saved.group.id
- rename:
ignore_failure: true
field: auditd.log.suid
target_field: user.saved.id
- rename:
ignore_failure: true
field: auditd.log.ogid
target_field: user.owner.group.id
- rename:
ignore_failure: true
field: auditd.log.ouid
target_field: user.owner.id
- rename:
ignore_failure: true
field: auditd.log.comm
target_field: process.name
- rename:
ignore_failure: true
field: auditd.log.exe
target_field: process.executable
- rename:
ignore_failure: true
field: auditd.log.pid
target_field: process.pid
- rename:
ignore_failure: true
field: auditd.log.ppid
target_field: process.ppid
- convert:
ignore_missing: true
field: process.pid
type: long
- convert:
ignore_missing: true
field: process.ppid
type: long
- rename:
ignore_failure: true
field: auditd.log.cmd
target_field: process.args
- split:
ignore_failure: true
field: process.args
separator: "\\s+"
- rename:
ignore_failure: true
field: auditd.log.argc
target_field: process.args_count
- script:
if: "ctx?.process?.args != null"
lang: painless
source: >-
if (ctx.process.args instanceof List) {
ctx.process.args_count = ctx.process.args.length;
}
- convert:
ignore_missing: true
field: process.args_count
type: long
- rename:
ignore_failure: true
field: auditd.log.exit
target_field: process.exit_code
- convert:
ignore_missing: true
field: process.exit_code
type: long
- rename:
ignore_missing: true
field: auditd.log.cwd
target_field: process.working_directory
- rename:
ignore_failure: true
field: auditd.log.terminal
target_field: user.terminal
- rename:
ignore_failure: true
field: auditd.log.msg
target_field: message
- rename:
ignore_failure: true
field: auditd.log.res
target_field: event.outcome
- rename:
ignore_failure: true
field: auditd.log.record_type
target_field: event.action
- lowercase:
ignore_failure: true
field: event.action
- rename:
ignore_failure: true
field: auditd.log.src
target_field: source.address
- rename:
ignore_failure: true
field: auditd.log.dst
target_field: destination.address
- grok:
field: source.address
patterns:
- "^%{IP:source.ip}$"
ignore_failure: true
- geoip:
field: source.ip
target_field: source.geo
ignore_failure: true
- geoip:
database_file: GeoLite2-ASN.mmdb
field: source.ip
target_field: source.as
properties:
- asn
- organization_name
ignore_missing: true
- rename:
field: source.as.asn
target_field: source.as.number
ignore_missing: true
- rename:
field: source.as.organization_name
target_field: source.as.organization.name
ignore_missing: true
on_failure:
- set:
field: error.message
value: "{{ _ingest.on_failure_message }}"
With pipelines that populate some fields dynamically (e.g. :
event.ingestedis populated with_ingest.timestamp) pipeline tests will fail, since there is no way with the current config options to ignore this kind of fields.Would be nice to add some config option that would help in this situations, either by ignoring the field completely, or just by checking its existence and not caring about the value, for example.
An example log line and pipeline to be able to test this is
auditd.log
pipeline.yml