Skip to content

Commit 2b41a1e

Browse files
authored
Carbon2 serializer: sanitize metric name (#9026)
1 parent f0c8549 commit 2b41a1e

6 files changed

Lines changed: 182 additions & 26 deletions

File tree

config/config.go

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1388,6 +1388,7 @@ func (c *Config) buildSerializer(tbl *ast.Table) (serializers.Serializer, error)
13881388
c.getFieldString(tbl, "template", &sc.Template)
13891389
c.getFieldStringSlice(tbl, "templates", &sc.Templates)
13901390
c.getFieldString(tbl, "carbon2_format", &sc.Carbon2Format)
1391+
c.getFieldString(tbl, "carbon2_sanitize_replace_char", &sc.Carbon2SanitizeReplaceChar)
13911392
c.getFieldInt(tbl, "influx_max_line_bytes", &sc.InfluxMaxLineBytes)
13921393

13931394
c.getFieldBool(tbl, "influx_sort_fields", &sc.InfluxSortFields)
@@ -1449,9 +1450,9 @@ func (c *Config) buildOutput(name string, tbl *ast.Table) (*models.OutputConfig,
14491450

14501451
func (c *Config) missingTomlField(_ reflect.Type, key string) error {
14511452
switch key {
1452-
case "alias", "carbon2_format", "collectd_auth_file", "collectd_parse_multivalue",
1453-
"collectd_security_level", "collectd_typesdb", "collection_jitter", "csv_column_names",
1454-
"csv_column_types", "csv_comment", "csv_delimiter", "csv_header_row_count",
1453+
case "alias", "carbon2_format", "carbon2_sanitize_replace_char", "collectd_auth_file",
1454+
"collectd_parse_multivalue", "collectd_security_level", "collectd_typesdb", "collection_jitter",
1455+
"csv_column_names", "csv_column_types", "csv_comment", "csv_delimiter", "csv_header_row_count",
14551456
"csv_measurement_column", "csv_skip_columns", "csv_skip_rows", "csv_tag_columns",
14561457
"csv_timestamp_column", "csv_timestamp_format", "csv_timezone", "csv_trim_space", "csv_skip_values",
14571458
"data_format", "data_type", "delay", "drop", "drop_original", "dropwizard_metric_registry_path",

plugins/outputs/sumologic/sumologic_test.go

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -96,7 +96,7 @@ func TestMethod(t *testing.T) {
9696
w.WriteHeader(http.StatusOK)
9797
})
9898

99-
serializer, err := carbon2.NewSerializer(string(carbon2.Carbon2FormatFieldSeparate))
99+
serializer, err := carbon2.NewSerializer(string(carbon2.Carbon2FormatFieldSeparate), carbon2.DefaultSanitizeReplaceChar)
100100
require.NoError(t, err)
101101

102102
plugin := tt.plugin()
@@ -173,7 +173,7 @@ func TestStatusCode(t *testing.T) {
173173
w.WriteHeader(tt.statusCode)
174174
})
175175

176-
serializer, err := carbon2.NewSerializer(string(carbon2.Carbon2FormatFieldSeparate))
176+
serializer, err := carbon2.NewSerializer(string(carbon2.Carbon2FormatFieldSeparate), carbon2.DefaultSanitizeReplaceChar)
177177
require.NoError(t, err)
178178

179179
tt.plugin.SetSerializer(serializer)
@@ -199,7 +199,7 @@ func TestContentType(t *testing.T) {
199199
s.headers = map[string]string{
200200
contentTypeHeader: carbon2ContentType,
201201
}
202-
sr, err := carbon2.NewSerializer(string(carbon2.Carbon2FormatFieldSeparate))
202+
sr, err := carbon2.NewSerializer(string(carbon2.Carbon2FormatFieldSeparate), carbon2.DefaultSanitizeReplaceChar)
203203
require.NoError(t, err)
204204
s.SetSerializer(sr)
205205
return s
@@ -213,7 +213,7 @@ func TestContentType(t *testing.T) {
213213
s.headers = map[string]string{
214214
contentTypeHeader: carbon2ContentType,
215215
}
216-
sr, err := carbon2.NewSerializer(string(carbon2.Carbon2FormatMetricIncludesField))
216+
sr, err := carbon2.NewSerializer(string(carbon2.Carbon2FormatMetricIncludesField), carbon2.DefaultSanitizeReplaceChar)
217217
require.NoError(t, err)
218218
s.SetSerializer(sr)
219219
return s
@@ -310,7 +310,7 @@ func TestContentEncodingGzip(t *testing.T) {
310310
w.WriteHeader(http.StatusNoContent)
311311
})
312312

313-
serializer, err := carbon2.NewSerializer(string(carbon2.Carbon2FormatFieldSeparate))
313+
serializer, err := carbon2.NewSerializer(string(carbon2.Carbon2FormatFieldSeparate), carbon2.DefaultSanitizeReplaceChar)
314314
require.NoError(t, err)
315315

316316
plugin := tt.plugin()
@@ -345,7 +345,7 @@ func TestDefaultUserAgent(t *testing.T) {
345345
MaxRequstBodySize: Default().MaxRequstBodySize,
346346
}
347347

348-
serializer, err := carbon2.NewSerializer(string(carbon2.Carbon2FormatFieldSeparate))
348+
serializer, err := carbon2.NewSerializer(string(carbon2.Carbon2FormatFieldSeparate), carbon2.DefaultSanitizeReplaceChar)
349349
require.NoError(t, err)
350350

351351
plugin.SetSerializer(serializer)
@@ -594,7 +594,7 @@ func TestMaxRequestBodySize(t *testing.T) {
594594
w.WriteHeader(http.StatusOK)
595595
})
596596

597-
serializer, err := carbon2.NewSerializer(string(carbon2.Carbon2FormatFieldSeparate))
597+
serializer, err := carbon2.NewSerializer(string(carbon2.Carbon2FormatFieldSeparate), carbon2.DefaultSanitizeReplaceChar)
598598
require.NoError(t, err)
599599

600600
plugin := tt.plugin()
@@ -626,7 +626,7 @@ func TestTryingToSendEmptyMetricsDoesntFail(t *testing.T) {
626626
plugin := Default()
627627
plugin.URL = u.String()
628628

629-
serializer, err := carbon2.NewSerializer(string(carbon2.Carbon2FormatFieldSeparate))
629+
serializer, err := carbon2.NewSerializer(string(carbon2.Carbon2FormatFieldSeparate), carbon2.DefaultSanitizeReplaceChar)
630630
require.NoError(t, err)
631631
plugin.SetSerializer(serializer)
632632

plugins/serializers/carbon2/README.md

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,11 @@ The `carbon2` serializer translates the Telegraf metric format to the [Carbon2 f
2121
## * "metric_includes_field"
2222
## * "" - defaults to "field_separate"
2323
# carbon2_format = "field_separate"
24+
25+
## Character used for replacing sanitized characters. By default ":" is used.
26+
## The following character set is being replaced with sanitize replace char:
27+
## !@#$%^&*()+`'\"[]{};<>,?/\\|=
28+
# carbon2_sanitize_replace_char = ":"
2429
```
2530

2631
Standard form:
@@ -52,6 +57,17 @@ metric=name_field_2 host=foo 4 1234567890
5257
metric=name_field_N host=foo 59 1234567890
5358
```
5459

60+
### Metric name sanitization
61+
62+
In order to sanitize the metric name one can specify `carbon2_sanitize_replace_char`
63+
in order to replace the following characters in the metric name:
64+
65+
```
66+
!@#$%^&*()+`'\"[]{};<>,?/\\|=
67+
```
68+
69+
By default they will be replaced with `:`.
70+
5571
## Metrics
5672

5773
The serializer converts the metrics by creating `intrinsic_tags` using the combination of metric name and fields.

plugins/serializers/carbon2/carbon2.go

Lines changed: 31 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package carbon2
22

33
import (
44
"bytes"
5+
"errors"
56
"fmt"
67
"strconv"
78
"strings"
@@ -23,11 +24,23 @@ var formats = map[format]struct{}{
2324
Carbon2FormatMetricIncludesField: {},
2425
}
2526

27+
const (
28+
DefaultSanitizeReplaceChar = ":"
29+
sanitizedChars = "!@#$%^&*()+`'\"[]{};<>,?/\\|="
30+
)
31+
2632
type Serializer struct {
27-
metricsFormat format
33+
metricsFormat format
34+
sanitizeReplacer *strings.Replacer
2835
}
2936

30-
func NewSerializer(metricsFormat string) (*Serializer, error) {
37+
func NewSerializer(metricsFormat string, sanitizeReplaceChar string) (*Serializer, error) {
38+
if sanitizeReplaceChar == "" {
39+
sanitizeReplaceChar = DefaultSanitizeReplaceChar
40+
} else if len(sanitizeReplaceChar) > 1 {
41+
return nil, errors.New("sanitize replace char has to be a singular character")
42+
}
43+
3144
var f = format(metricsFormat)
3245

3346
if _, ok := formats[f]; !ok {
@@ -40,7 +53,8 @@ func NewSerializer(metricsFormat string) (*Serializer, error) {
4053
}
4154

4255
return &Serializer{
43-
metricsFormat: f,
56+
metricsFormat: f,
57+
sanitizeReplacer: createSanitizeReplacer(sanitizedChars, rune(sanitizeReplaceChar[0])),
4458
}, nil
4559
}
4660

@@ -65,15 +79,17 @@ func (s *Serializer) createObject(metric telegraf.Metric) []byte {
6579
continue
6680
}
6781

82+
name := s.sanitizeReplacer.Replace(metric.Name())
83+
6884
switch metricsFormat {
6985
case Carbon2FormatFieldSeparate:
7086
m.WriteString(serializeMetricFieldSeparate(
71-
metric.Name(), fieldName,
87+
name, fieldName,
7288
))
7389

7490
case Carbon2FormatMetricIncludesField:
7591
m.WriteString(serializeMetricIncludeField(
76-
metric.Name(), fieldName,
92+
name, fieldName,
7793
))
7894
}
7995

@@ -152,3 +168,13 @@ func bool2int(b bool) int {
152168
}
153169
return i
154170
}
171+
172+
// createSanitizeReplacer creates string replacer replacing all provided
173+
// characters with the replaceChar.
174+
func createSanitizeReplacer(sanitizedChars string, replaceChar rune) *strings.Replacer {
175+
sanitizeCharPairs := make([]string, 0, 2*len(sanitizedChars))
176+
for _, c := range sanitizedChars {
177+
sanitizeCharPairs = append(sanitizeCharPairs, string(c), string(replaceChar))
178+
}
179+
return strings.NewReplacer(sanitizeCharPairs...)
180+
}

plugins/serializers/carbon2/carbon2_test.go

Lines changed: 117 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,7 @@ func TestSerializeMetricFloat(t *testing.T) {
4646

4747
for _, tc := range testcases {
4848
t.Run(string(tc.format), func(t *testing.T) {
49-
s, err := NewSerializer(string(tc.format))
49+
s, err := NewSerializer(string(tc.format), DefaultSanitizeReplaceChar)
5050
require.NoError(t, err)
5151

5252
buf, err := s.Serialize(m)
@@ -84,7 +84,7 @@ func TestSerializeMetricWithEmptyStringTag(t *testing.T) {
8484

8585
for _, tc := range testcases {
8686
t.Run(string(tc.format), func(t *testing.T) {
87-
s, err := NewSerializer(string(tc.format))
87+
s, err := NewSerializer(string(tc.format), DefaultSanitizeReplaceChar)
8888
require.NoError(t, err)
8989

9090
buf, err := s.Serialize(m)
@@ -122,7 +122,7 @@ func TestSerializeWithSpaces(t *testing.T) {
122122

123123
for _, tc := range testcases {
124124
t.Run(string(tc.format), func(t *testing.T) {
125-
s, err := NewSerializer(string(tc.format))
125+
s, err := NewSerializer(string(tc.format), DefaultSanitizeReplaceChar)
126126
require.NoError(t, err)
127127

128128
buf, err := s.Serialize(m)
@@ -160,7 +160,7 @@ func TestSerializeMetricInt(t *testing.T) {
160160

161161
for _, tc := range testcases {
162162
t.Run(string(tc.format), func(t *testing.T) {
163-
s, err := NewSerializer(string(tc.format))
163+
s, err := NewSerializer(string(tc.format), DefaultSanitizeReplaceChar)
164164
require.NoError(t, err)
165165

166166
buf, err := s.Serialize(m)
@@ -198,7 +198,7 @@ func TestSerializeMetricString(t *testing.T) {
198198

199199
for _, tc := range testcases {
200200
t.Run(string(tc.format), func(t *testing.T) {
201-
s, err := NewSerializer(string(tc.format))
201+
s, err := NewSerializer(string(tc.format), DefaultSanitizeReplaceChar)
202202
require.NoError(t, err)
203203

204204
buf, err := s.Serialize(m)
@@ -255,7 +255,7 @@ func TestSerializeMetricBool(t *testing.T) {
255255

256256
for _, tc := range testcases {
257257
t.Run(string(tc.format), func(t *testing.T) {
258-
s, err := NewSerializer(string(tc.format))
258+
s, err := NewSerializer(string(tc.format), DefaultSanitizeReplaceChar)
259259
require.NoError(t, err)
260260

261261
buf, err := s.Serialize(tc.metric)
@@ -300,7 +300,7 @@ metric=cpu_value 42 0
300300

301301
for _, tc := range testcases {
302302
t.Run(string(tc.format), func(t *testing.T) {
303-
s, err := NewSerializer(string(tc.format))
303+
s, err := NewSerializer(string(tc.format), DefaultSanitizeReplaceChar)
304304
require.NoError(t, err)
305305

306306
buf, err := s.SerializeBatch(metrics)
@@ -310,3 +310,113 @@ metric=cpu_value 42 0
310310
})
311311
}
312312
}
313+
314+
func TestSerializeMetricIsProperlySanitized(t *testing.T) {
315+
now := time.Now()
316+
317+
testcases := []struct {
318+
metricFunc func() (telegraf.Metric, error)
319+
format format
320+
expected string
321+
replaceChar string
322+
expectedErr bool
323+
}{
324+
{
325+
metricFunc: func() (telegraf.Metric, error) {
326+
fields := map[string]interface{}{
327+
"usage_idle": float64(91.5),
328+
}
329+
return metric.New("cpu=1", nil, fields, now)
330+
},
331+
format: Carbon2FormatFieldSeparate,
332+
expected: fmt.Sprintf("metric=cpu:1 field=usage_idle 91.5 %d\n", now.Unix()),
333+
replaceChar: DefaultSanitizeReplaceChar,
334+
},
335+
{
336+
metricFunc: func() (telegraf.Metric, error) {
337+
fields := map[string]interface{}{
338+
"usage_idle": float64(91.5),
339+
}
340+
return metric.New("cpu=1", nil, fields, now)
341+
},
342+
format: Carbon2FormatFieldSeparate,
343+
expected: fmt.Sprintf("metric=cpu_1 field=usage_idle 91.5 %d\n", now.Unix()),
344+
replaceChar: "_",
345+
},
346+
{
347+
metricFunc: func() (telegraf.Metric, error) {
348+
fields := map[string]interface{}{
349+
"usage_idle": float64(91.5),
350+
}
351+
return metric.New("cpu=1=tmp$custom", nil, fields, now)
352+
},
353+
format: Carbon2FormatFieldSeparate,
354+
expected: fmt.Sprintf("metric=cpu:1:tmp:custom field=usage_idle 91.5 %d\n", now.Unix()),
355+
replaceChar: DefaultSanitizeReplaceChar,
356+
},
357+
{
358+
metricFunc: func() (telegraf.Metric, error) {
359+
fields := map[string]interface{}{
360+
"usage_idle": float64(91.5),
361+
}
362+
return metric.New("cpu=1=tmp$custom%namespace", nil, fields, now)
363+
},
364+
format: Carbon2FormatFieldSeparate,
365+
expected: fmt.Sprintf("metric=cpu:1:tmp:custom:namespace field=usage_idle 91.5 %d\n", now.Unix()),
366+
replaceChar: DefaultSanitizeReplaceChar,
367+
},
368+
{
369+
metricFunc: func() (telegraf.Metric, error) {
370+
fields := map[string]interface{}{
371+
"usage_idle": float64(91.5),
372+
}
373+
return metric.New("cpu=1=tmp$custom%namespace", nil, fields, now)
374+
},
375+
format: Carbon2FormatMetricIncludesField,
376+
expected: fmt.Sprintf("metric=cpu:1:tmp:custom:namespace_usage_idle 91.5 %d\n", now.Unix()),
377+
replaceChar: DefaultSanitizeReplaceChar,
378+
},
379+
{
380+
metricFunc: func() (telegraf.Metric, error) {
381+
fields := map[string]interface{}{
382+
"usage_idle": float64(91.5),
383+
}
384+
return metric.New("cpu=1=tmp$custom%namespace", nil, fields, now)
385+
},
386+
format: Carbon2FormatMetricIncludesField,
387+
expected: fmt.Sprintf("metric=cpu_1_tmp_custom_namespace_usage_idle 91.5 %d\n", now.Unix()),
388+
replaceChar: "_",
389+
},
390+
{
391+
metricFunc: func() (telegraf.Metric, error) {
392+
fields := map[string]interface{}{
393+
"usage_idle": float64(91.5),
394+
}
395+
return metric.New("cpu=1=tmp$custom%namespace", nil, fields, now)
396+
},
397+
format: Carbon2FormatMetricIncludesField,
398+
expectedErr: true,
399+
replaceChar: "___",
400+
},
401+
}
402+
403+
for _, tc := range testcases {
404+
t.Run(string(tc.format), func(t *testing.T) {
405+
m, err := tc.metricFunc()
406+
require.NoError(t, err)
407+
408+
s, err := NewSerializer(string(tc.format), tc.replaceChar)
409+
if tc.expectedErr {
410+
require.Error(t, err)
411+
return
412+
}
413+
414+
require.NoError(t, err)
415+
416+
buf, err := s.Serialize(m)
417+
require.NoError(t, err)
418+
419+
assert.Equal(t, tc.expected, string(buf))
420+
})
421+
}
422+
}

0 commit comments

Comments
 (0)