Skip to content

Commit 2b94416

Browse files
committed
Make set processor with copy_from compatible with ES < 7.13
Automatically rewrite Ingest Node `set` processors that use `copy_from` when connected to Elasticsearch versions less than 7.13. The copy_from is replaced with `{{{$copy_from value}}}`. The triple brace ensures the behavior is the same as copy_from in that no escaping is added.
1 parent 4accfa8 commit 2b94416

3 files changed

Lines changed: 87 additions & 0 deletions

File tree

CHANGELOG.next.asciidoc

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -280,6 +280,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d
280280
- Fix bug in `httpjson` that prevented `first_event` getting updated. {pull}26407[26407]
281281
- Fix bug in the Syslog input that misparsed rfc5424 days starting with 0. {pull}26419[26419]
282282
- Do not close filestream harvester if an unexpected error is returned when close.on_state_change.* is enabled. {pull}26411[26411]
283+
- Fix module pipeline compatability for Elasticsearch versions less than 7.13.0 in modules that use Ingest Node `set` processors with `copy_from`. {pull}26593[26593]
283284

284285
*Filebeat*
285286

filebeat/fileset/compatibility.go

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,13 @@ var processorCompatibilityChecks = []processorCompatibility{
5050
},
5151
adaptConfig: deleteProcessor,
5252
},
53+
{
54+
procType: "set",
55+
checkVersion: func(esVersion *common.Version) bool {
56+
return esVersion.LessThan(common.MustNewVersion("7.13.0"))
57+
},
58+
adaptConfig: replaceSetCopyFromValue,
59+
},
5360
{
5461
procType: "set",
5562
checkVersion: func(esVersion *common.Version) bool {
@@ -174,6 +181,22 @@ func replaceSetIgnoreEmptyValue(config map[string]interface{}, log *logp.Logger)
174181
return false, nil
175182
}
176183

184+
// replaceSetCopyFromValue replaces copy_from option with a 'value'.
185+
func replaceSetCopyFromValue(config map[string]interface{}, log *logp.Logger) (bool, error) {
186+
copyFrom, ok := config["copy_from"].(string)
187+
if !ok {
188+
return false, nil
189+
}
190+
191+
log.Debug("Removing unsupported 'copy_from' from set processor.")
192+
delete(config, "copy_from")
193+
194+
value := "{{{" + copyFrom + "}}}"
195+
log.Debugf("Adding 'value: %q' to replace 'copy_from' in set processor.", value)
196+
config["value"] = value
197+
return false, nil
198+
}
199+
177200
// replaceAppendAllowDuplicates replaces allow_duplicates option with an if statement
178201
// so ES less than 7.10 will work.
179202
func replaceAppendAllowDuplicates(config map[string]interface{}, log *logp.Logger) (bool, error) {

filebeat/fileset/compatibility_test.go

Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -315,6 +315,69 @@ func TestReplaceSetIgnoreEmptyValue(t *testing.T) {
315315
}
316316
}
317317

318+
func TestReplaceSetCopyFromValue(t *testing.T) {
319+
setWithCopyFrom := map[string]interface{}{
320+
"processors": []interface{}{
321+
map[string]interface{}{
322+
"set": map[string]interface{}{
323+
"field": "rule.name",
324+
"copy_from": "panw.panos.ruleset",
325+
},
326+
},
327+
}}
328+
329+
cases := []struct {
330+
name string
331+
esVersion *common.Version
332+
content map[string]interface{}
333+
expected map[string]interface{}
334+
isErrExpected bool
335+
}{
336+
{
337+
name: "ES < 7.13.0",
338+
esVersion: common.MustNewVersion("7.12.2"),
339+
content: setWithCopyFrom,
340+
expected: map[string]interface{}{
341+
"processors": []interface{}{
342+
map[string]interface{}{
343+
"set": map[string]interface{}{
344+
"field": "rule.name",
345+
"value": "{{{panw.panos.ruleset}}}",
346+
},
347+
},
348+
},
349+
},
350+
isErrExpected: false,
351+
},
352+
{
353+
name: "ES == 7.13.0",
354+
esVersion: common.MustNewVersion("7.13.0"),
355+
content: setWithCopyFrom,
356+
expected: setWithCopyFrom,
357+
},
358+
{
359+
name: "ES > 7.13.0",
360+
esVersion: common.MustNewVersion("8.0.0"),
361+
content: setWithCopyFrom,
362+
expected: setWithCopyFrom,
363+
},
364+
}
365+
366+
for _, test := range cases {
367+
test := test
368+
t.Run(test.name, func(t *testing.T) {
369+
t.Parallel()
370+
err := adaptPipelineForCompatibility(*test.esVersion, "foo-pipeline", test.content, logp.NewLogger(logName))
371+
if test.isErrExpected {
372+
assert.Error(t, err)
373+
} else {
374+
require.NoError(t, err)
375+
assert.Equal(t, test.expected, test.content, test.name)
376+
}
377+
})
378+
}
379+
}
380+
318381
func TestReplaceAppendAllowDuplicates(t *testing.T) {
319382
cases := []struct {
320383
name string

0 commit comments

Comments
 (0)