Skip to content

Commit 4096f9b

Browse files
authored
[Azure] Sanitize message in case of malformed json (#34874)
* Add sanitization function and test for azure input
1 parent 13eea9d commit 4096f9b

5 files changed

Lines changed: 180 additions & 0 deletions

File tree

CHANGELOG.next.asciidoc

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -287,6 +287,7 @@ automatic splitting at root level, if root level element is an array. {pull}3415
287287
- Allow neflow v9 and ipfix templates to be shared between source addresses. {pull}35036[35036]
288288
- Add support for collecting IPv6 metrics. {pull}35123[35123]
289289
- Add oracle authentication messages parsing {pull}35127[35127]
290+
- Add sanitization capabilities to azure-eventhub input {pull}34874[34874]
290291
- Add support for CRC validation in Filebeat's HTTP endpoint input. {pull}35204[35204]
291292
- Add execution budget to CEL input. {pull}35409[35409]
292293
- Add XML decoding support to HTTPJSON. {issue}34438[34438] {pull}35235[35235]

x-pack/filebeat/input/azureeventhub/config.go

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,8 @@ type azureInputConfig struct {
2525
SAContainer string `config:"storage_account_container"`
2626
// by default the azure public environment is used, to override, users can provide a specific resource manager endpoint
2727
OverrideEnvironment string `config:"resource_manager_endpoint"`
28+
// cleanup the log JSON input for known issues, options: SINGLE_QUOTES, NEW_LINES
29+
SanitizeOptions []string `config:"sanitize_options"`
2830
}
2931

3032
const ephContainerName = "filebeat"
@@ -63,6 +65,14 @@ func (conf *azureInputConfig) Validate() error {
6365
return err
6466
}
6567

68+
// log a warning for each sanitization option not supported
69+
for _, opt := range conf.SanitizeOptions {
70+
err := sanitizeOptionsValidate(opt)
71+
if err != nil {
72+
logger.Warnf("%s: %v", opt, err)
73+
}
74+
}
75+
6676
return nil
6777
}
6878

x-pack/filebeat/input/azureeventhub/input.go

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -166,6 +166,15 @@ func (a *azureInput) processEvents(event *eventhub.Event, partitionID string) bo
166166
func (a *azureInput) parseMultipleMessages(bMessage []byte) []string {
167167
var mapObject map[string][]interface{}
168168
var messages []string
169+
170+
// Clean up the message for known issues [1] where Azure services produce malformed JSON documents.
171+
// Sanitization occurs if options are available and the message contains an invalid JSON.
172+
//
173+
// [1]: https://learn.microsoft.com/en-us/answers/questions/1001797/invalid-json-logs-produced-for-function-apps
174+
if len(a.config.SanitizeOptions) != 0 && !json.Valid(bMessage) {
175+
bMessage = sanitize(bMessage, a.config.SanitizeOptions...)
176+
}
177+
169178
// check if the message is a "records" object containing a list of events
170179
err := json.Unmarshal(bMessage, &mapObject)
171180
if err == nil {
Lines changed: 78 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,78 @@
1+
// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
2+
// or more contributor license agreements. Licensed under the Elastic License;
3+
// you may not use this file except in compliance with the Elastic License.
4+
5+
//go:build !aix
6+
// +build !aix
7+
8+
package azureeventhub
9+
10+
import (
11+
"bytes"
12+
"errors"
13+
)
14+
15+
type sanitizationOption string
16+
17+
const (
18+
newLines sanitizationOption = "NEW_LINES"
19+
singleQuotes sanitizationOption = "SINGLE_QUOTES"
20+
)
21+
22+
// sanitizeOptionsValidate validates for supported sanitization options
23+
func sanitizeOptionsValidate(s string) error {
24+
switch s {
25+
case "NEW_LINES":
26+
return nil
27+
case "SINGLE_QUOTES":
28+
return nil
29+
default:
30+
return errors.New("invalid sanitization option")
31+
}
32+
}
33+
34+
// sanitize applies the sanitization options specified in the config
35+
// if no sanitization options are provided, the message remains unchanged
36+
func sanitize(jsonByte []byte, opts ...string) []byte {
37+
res := jsonByte
38+
39+
for _, opt := range opts {
40+
switch sanitizationOption(opt) {
41+
case newLines:
42+
res = sanitizeNewLines(res)
43+
case singleQuotes:
44+
res = sanitizeSingleQuotes(res)
45+
}
46+
}
47+
48+
return res
49+
}
50+
51+
// sanitizeNewLines removes newlines found in the message
52+
func sanitizeNewLines(jsonByte []byte) []byte {
53+
return bytes.ReplaceAll(jsonByte, []byte("\n"), []byte{})
54+
}
55+
56+
// sanitizeSingleQuotes replaces single quotes with double quotes in the message
57+
// single quotes that are in between double quotes remain unchanged
58+
func sanitizeSingleQuotes(jsonByte []byte) []byte {
59+
var result bytes.Buffer
60+
var prevChar byte
61+
62+
inDoubleQuotes := false
63+
64+
for _, r := range jsonByte {
65+
if r == '"' && prevChar != '\\' {
66+
inDoubleQuotes = !inDoubleQuotes
67+
}
68+
69+
if r == '\'' && !inDoubleQuotes {
70+
result.WriteRune('"')
71+
} else {
72+
result.WriteByte(r)
73+
}
74+
prevChar = r
75+
}
76+
77+
return result.Bytes()
78+
}
Lines changed: 82 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,82 @@
1+
// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
2+
// or more contributor license agreements. Licensed under the Elastic License;
3+
// you may not use this file except in compliance with the Elastic License.
4+
5+
//go:build !aix
6+
// +build !aix
7+
8+
package azureeventhub
9+
10+
import (
11+
"fmt"
12+
"testing"
13+
14+
"github.com/stretchr/testify/assert"
15+
16+
"github.com/elastic/elastic-agent-libs/logp"
17+
)
18+
19+
func TestParseMultipleMessagesSanitization(t *testing.T) {
20+
msg := "{\"records\":[{'test':\"this is some message\",\n\n\"time\":\"2019-12-17T13:43:44.4946995Z\"}," +
21+
"{\"test\":\"this is '2nd' message\",\"time\":\"2019-12-17T13:43:44.4946995Z\"}," +
22+
"{\"time\": \"2023-04-11T13:35:20Z\", \"resourceId\": \"/SUBSCRIPTIONS/REDACTED/RESOURCEGROUPS/ELASTIC-FUNCTION-TEST/PROVIDERS/MICROSOFT.WEB/SITES/REDACTED\", \"category\": \"FunctionAppLogs\", \"operationName\": \"Microsoft.Web/sites/functions/log\", \"level\": \"Informational\", \"location\": \"West Europe\", \"properties\": {'appName':'REDACTED','roleInstance':'REDACTED','message':'Elastic Test Function Trigger. ---- West Europe West Europe West Europe West Europe West Europe ','category':'Function.HttpTriggerJava.User','hostVersion':'4.16.5.5','functionInvocationId':'REDACTED','functionName':'HttpTriggerJava','hostInstanceId':'REDACTED','level':'Information','levelId':2,'processId':62}}]}"
23+
msgs := []string{
24+
"{\"test\":\"this is some message\",\"time\":\"2019-12-17T13:43:44.4946995Z\"}",
25+
"{\"test\":\"this is '2nd' message\",\"time\":\"2019-12-17T13:43:44.4946995Z\"}",
26+
"{\"category\":\"FunctionAppLogs\",\"level\":\"Informational\",\"location\":\"West Europe\",\"operationName\":\"Microsoft.Web/sites/functions/log\",\"properties\":{\"appName\":\"REDACTED\",\"category\":\"Function.HttpTriggerJava.User\",\"functionInvocationId\":\"REDACTED\",\"functionName\":\"HttpTriggerJava\",\"hostInstanceId\":\"REDACTED\",\"hostVersion\":\"4.16.5.5\",\"level\":\"Information\",\"levelId\":2,\"message\":\"Elastic Test Function Trigger. ---- West Europe West Europe West Europe West Europe West Europe \",\"processId\":62,\"roleInstance\":\"REDACTED\"},\"resourceId\":\"/SUBSCRIPTIONS/REDACTED/RESOURCEGROUPS/ELASTIC-FUNCTION-TEST/PROVIDERS/MICROSOFT.WEB/SITES/REDACTED\",\"time\":\"2023-04-11T13:35:20Z\"}",
27+
}
28+
29+
input := azureInput{
30+
log: logp.NewLogger(fmt.Sprintf("%s test for input", inputName)),
31+
config: azureInputConfig{
32+
SanitizeOptions: []string{"SINGLE_QUOTES", "NEW_LINES"},
33+
},
34+
}
35+
36+
messages := input.parseMultipleMessages([]byte(msg))
37+
assert.NotNil(t, messages)
38+
assert.Equal(t, len(messages), 3)
39+
for _, ms := range messages {
40+
assert.Contains(t, msgs, ms)
41+
}
42+
}
43+
44+
func TestSanitize(t *testing.T) {
45+
jsonByte := []byte("{'test':\"this is 'some' message\n\",\n\"time\":\"2019-12-17T13:43:44.4946995Z\"}")
46+
47+
testCases := []struct {
48+
name string
49+
opts []string
50+
expected []byte
51+
}{
52+
{
53+
name: "no options",
54+
opts: []string{},
55+
expected: jsonByte,
56+
},
57+
{
58+
name: "NEW_LINES option",
59+
opts: []string{"NEW_LINES"},
60+
expected: []byte("{'test':\"this is 'some' message\",\"time\":\"2019-12-17T13:43:44.4946995Z\"}"),
61+
},
62+
{
63+
name: "SINGLE_QUOTES option",
64+
opts: []string{"SINGLE_QUOTES"},
65+
expected: []byte("{\"test\":\"this is 'some' message\n\",\n\"time\":\"2019-12-17T13:43:44.4946995Z\"}"),
66+
},
67+
{
68+
name: "both options",
69+
opts: []string{"NEW_LINES", "SINGLE_QUOTES"},
70+
expected: []byte("{\"test\":\"this is 'some' message\",\"time\":\"2019-12-17T13:43:44.4946995Z\"}"),
71+
},
72+
}
73+
74+
// Run test cases
75+
for _, tc := range testCases {
76+
tc := tc
77+
t.Run(tc.name, func(t *testing.T) {
78+
res := sanitize(jsonByte, tc.opts...)
79+
assert.Equal(t, tc.expected, res)
80+
})
81+
}
82+
}

0 commit comments

Comments
 (0)