Skip to content

Commit 08d78ec

Browse files
committed
Add the new parser as an option to influxdb_v2_listener
As a part of this, move the timefunc out of both influx parsers and into a common location for now.
1 parent 8149f90 commit 08d78ec

11 files changed

Lines changed: 172 additions & 79 deletions

File tree

internal/time.go

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
package internal
2+
3+
import (
4+
"time"
5+
)
6+
7+
type TimeFunc func() time.Time

plugins/inputs/influxdb_listener/influxdb_listener.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@ type InfluxDBListener struct {
3939
DatabaseTag string `toml:"database_tag"`
4040
RetentionPolicyTag string `toml:"retention_policy_tag"`
4141

42-
timeFunc influx.TimeFunc
42+
timeFunc internal.TimeFunc
4343

4444
listener net.Listener
4545
server http.Server

plugins/inputs/influxdb_v2_listener/README.md

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,12 @@ Telegraf minimum version: Telegraf 1.16.0
4040
## Optional token to accept for HTTP authentication.
4141
## You probably want to make sure you have TLS configured above for this.
4242
# token = "some-long-shared-secret-token"
43+
44+
## Influx parser type to use. Users can choose between 'internal' and
45+
## 'upstream'. The internal parser is what Telegraf has historically used.
46+
## While the upstream parser involved a large re-write to make it more
47+
## memory efficient and performant.
48+
# parser_type = internal
4349
```
4450

4551
## Metrics

plugins/inputs/influxdb_v2_listener/influxdb_v2_listener.go

Lines changed: 31 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ import (
1717
tlsint "github.com/influxdata/telegraf/plugins/common/tls"
1818
"github.com/influxdata/telegraf/plugins/inputs"
1919
"github.com/influxdata/telegraf/plugins/parsers/influx"
20+
"github.com/influxdata/telegraf/plugins/parsers/influx_upstream"
2021
"github.com/influxdata/telegraf/selfstat"
2122
)
2223

@@ -43,8 +44,9 @@ type InfluxDBV2Listener struct {
4344
MaxBodySize config.Size `toml:"max_body_size"`
4445
Token string `toml:"token"`
4546
BucketTag string `toml:"bucket_tag"`
47+
ParserType string `toml:"parser_type"`
4648

47-
timeFunc influx.TimeFunc
49+
timeFunc internal.TimeFunc
4850

4951
listener net.Listener
5052
server http.Server
@@ -92,6 +94,12 @@ const sampleConfig = `
9294
## Optional token to accept for HTTP authentication.
9395
## You probably want to make sure you have TLS configured above for this.
9496
# token = "some-long-shared-secret-token"
97+
98+
## Influx parser type to use. Users can choose between 'internal' and
99+
## 'upstream'. The internal parser is what Telegraf has historically used.
100+
## While the upstream parser involved a large re-write to make it more
101+
## memory efficient and performant.
102+
# parser_type = internal
95103
`
96104

97105
func (h *InfluxDBV2Listener) SampleConfig() string {
@@ -264,22 +272,35 @@ func (h *InfluxDBV2Listener) handleWrite() http.HandlerFunc {
264272
}
265273
return
266274
}
267-
metricHandler := influx.NewMetricHandler()
268-
parser := influx.NewParser(metricHandler)
269-
parser.SetTimeFunc(h.timeFunc)
270275

271276
precisionStr := req.URL.Query().Get("precision")
272-
if precisionStr != "" {
273-
precision := getPrecisionMultiplier(precisionStr)
274-
metricHandler.SetTimePrecision(precision)
275-
}
276277

277278
var metrics []telegraf.Metric
278279
var err error
280+
if h.ParserType == "upstream" {
281+
parser := influx_upstream.NewParser()
282+
parser.SetTimeFunc(h.timeFunc)
283+
284+
if precisionStr != "" {
285+
precision := getPrecisionMultiplier(precisionStr)
286+
parser.SetTimePrecision(precision)
287+
}
279288

280-
metrics, err = parser.Parse(bytes)
289+
metrics, err = parser.Parse(bytes)
290+
} else {
291+
metricHandler := influx.NewMetricHandler()
292+
parser := influx.NewParser(metricHandler)
293+
parser.SetTimeFunc(h.timeFunc)
294+
295+
if precisionStr != "" {
296+
precision := getPrecisionMultiplier(precisionStr)
297+
metricHandler.SetTimePrecision(precision)
298+
}
299+
300+
metrics, err = parser.Parse(bytes)
301+
}
281302

282-
if err != influx.EOF && err != nil {
303+
if err != influx.EOF && err != influx_upstream.ErrEOF && err != nil {
283304
h.Log.Debugf("Error parsing the request body: %v", err.Error())
284305
if err := badRequest(res, Invalid, err.Error()); err != nil {
285306
h.Log.Debugf("error in bad-request: %v", err)

plugins/inputs/influxdb_v2_listener/influxdb_v2_listener_test.go

Lines changed: 80 additions & 50 deletions
Original file line numberDiff line numberDiff line change
@@ -216,17 +216,21 @@ func TestWriteNoNewline(t *testing.T) {
216216
require.NoError(t, listener.Start(acc))
217217
defer listener.Stop()
218218

219-
// post single message to listener
220-
resp, err := http.Post(createURL(listener, "http", "/api/v2/write", "bucket=mybucket"), "", bytes.NewBuffer([]byte(testMsgNoNewline)))
221-
require.NoError(t, err)
222-
require.NoError(t, resp.Body.Close())
223-
require.EqualValues(t, 204, resp.StatusCode)
219+
for _, parser := range []string{"internal", "upstream"} {
220+
listener.ParserType = parser
224221

225-
acc.Wait(1)
226-
acc.AssertContainsTaggedFields(t, "cpu_load_short",
227-
map[string]interface{}{"value": float64(12)},
228-
map[string]string{"host": "server01"},
229-
)
222+
// post single message to listener
223+
resp, err := http.Post(createURL(listener, "http", "/api/v2/write", "bucket=mybucket"), "", bytes.NewBuffer([]byte(testMsgNoNewline)))
224+
require.NoError(t, err)
225+
require.NoError(t, resp.Body.Close())
226+
require.EqualValues(t, 204, resp.StatusCode)
227+
228+
acc.Wait(1)
229+
acc.AssertContainsTaggedFields(t, "cpu_load_short",
230+
map[string]interface{}{"value": float64(12)},
231+
map[string]string{"host": "server01"},
232+
)
233+
}
230234
}
231235

232236
func TestAllOrNothing(t *testing.T) {
@@ -237,11 +241,14 @@ func TestAllOrNothing(t *testing.T) {
237241
require.NoError(t, listener.Start(acc))
238242
defer listener.Stop()
239243

240-
// post single message to listener
241-
resp, err := http.Post(createURL(listener, "http", "/api/v2/write", "bucket=mybucket"), "", bytes.NewBuffer([]byte(testPartial)))
242-
require.NoError(t, err)
243-
require.NoError(t, resp.Body.Close())
244-
require.EqualValues(t, 400, resp.StatusCode)
244+
for _, parser := range []string{"internal", "upstream"} {
245+
listener.ParserType = parser
246+
247+
resp, err := http.Post(createURL(listener, "http", "/api/v2/write", "bucket=mybucket"), "", bytes.NewBuffer([]byte(testPartial)))
248+
require.NoError(t, err)
249+
require.NoError(t, resp.Body.Close())
250+
require.EqualValues(t, 400, resp.StatusCode)
251+
}
245252
}
246253

247254
func TestWriteMaxLineSizeIncrease(t *testing.T) {
@@ -256,11 +263,15 @@ func TestWriteMaxLineSizeIncrease(t *testing.T) {
256263
require.NoError(t, listener.Start(acc))
257264
defer listener.Stop()
258265

259-
// Post a gigantic metric to the listener and verify that it writes OK this time:
260-
resp, err := http.Post(createURL(listener, "http", "/api/v2/write", "bucket=mybucket"), "", bytes.NewBuffer([]byte(hugeMetric)))
261-
require.NoError(t, err)
262-
require.NoError(t, resp.Body.Close())
263-
require.EqualValues(t, 204, resp.StatusCode)
266+
for _, parser := range []string{"internal", "upstream"} {
267+
listener.ParserType = parser
268+
269+
// Post a gigantic metric to the listener and verify that it writes OK this time:
270+
resp, err := http.Post(createURL(listener, "http", "/api/v2/write", "bucket=mybucket"), "", bytes.NewBuffer([]byte(hugeMetric)))
271+
require.NoError(t, err)
272+
require.NoError(t, resp.Body.Close())
273+
require.EqualValues(t, 204, resp.StatusCode)
274+
}
264275
}
265276

266277
func TestWriteVerySmallMaxBody(t *testing.T) {
@@ -276,10 +287,14 @@ func TestWriteVerySmallMaxBody(t *testing.T) {
276287
require.NoError(t, listener.Start(acc))
277288
defer listener.Stop()
278289

279-
resp, err := http.Post(createURL(listener, "http", "/api/v2/write", "bucket=mybucket"), "", bytes.NewBuffer([]byte(hugeMetric)))
280-
require.NoError(t, err)
281-
require.NoError(t, resp.Body.Close())
282-
require.EqualValues(t, 413, resp.StatusCode)
290+
for _, parser := range []string{"internal", "upstream"} {
291+
listener.ParserType = parser
292+
293+
resp, err := http.Post(createURL(listener, "http", "/api/v2/write", "bucket=mybucket"), "", bytes.NewBuffer([]byte(hugeMetric)))
294+
require.NoError(t, err)
295+
require.NoError(t, resp.Body.Close())
296+
require.EqualValues(t, 413, resp.StatusCode)
297+
}
283298
}
284299

285300
func TestWriteLargeLine(t *testing.T) {
@@ -437,11 +452,15 @@ func TestReceive404ForInvalidEndpoint(t *testing.T) {
437452
require.NoError(t, listener.Start(acc))
438453
defer listener.Stop()
439454

440-
// post single message to listener
441-
resp, err := http.Post(createURL(listener, "http", "/foobar", ""), "", bytes.NewBuffer([]byte(testMsg)))
442-
require.NoError(t, err)
443-
require.NoError(t, resp.Body.Close())
444-
require.EqualValues(t, 404, resp.StatusCode)
455+
for _, parser := range []string{"internal", "upstream"} {
456+
listener.ParserType = parser
457+
458+
// post single message to listener
459+
resp, err := http.Post(createURL(listener, "http", "/foobar", ""), "", bytes.NewBuffer([]byte(testMsg)))
460+
require.NoError(t, err)
461+
require.NoError(t, resp.Body.Close())
462+
require.EqualValues(t, 404, resp.StatusCode)
463+
}
445464
}
446465

447466
func TestWriteInvalid(t *testing.T) {
@@ -452,11 +471,15 @@ func TestWriteInvalid(t *testing.T) {
452471
require.NoError(t, listener.Start(acc))
453472
defer listener.Stop()
454473

455-
// post single message to listener
456-
resp, err := http.Post(createURL(listener, "http", "/api/v2/write", "bucket=mybucket"), "", bytes.NewBuffer([]byte(badMsg)))
457-
require.NoError(t, err)
458-
require.NoError(t, resp.Body.Close())
459-
require.EqualValues(t, 400, resp.StatusCode)
474+
for _, parser := range []string{"internal", "upstream"} {
475+
listener.ParserType = parser
476+
477+
// post single message to listener
478+
resp, err := http.Post(createURL(listener, "http", "/api/v2/write", "bucket=mybucket"), "", bytes.NewBuffer([]byte(badMsg)))
479+
require.NoError(t, err)
480+
require.NoError(t, resp.Body.Close())
481+
require.EqualValues(t, 400, resp.StatusCode)
482+
}
460483
}
461484

462485
func TestWriteEmpty(t *testing.T) {
@@ -467,11 +490,15 @@ func TestWriteEmpty(t *testing.T) {
467490
require.NoError(t, listener.Start(acc))
468491
defer listener.Stop()
469492

470-
// post single message to listener
471-
resp, err := http.Post(createURL(listener, "http", "/api/v2/write", "bucket=mybucket"), "", bytes.NewBuffer([]byte(emptyMsg)))
472-
require.NoError(t, err)
473-
require.NoError(t, resp.Body.Close())
474-
require.EqualValues(t, 204, resp.StatusCode)
493+
for _, parser := range []string{"internal", "upstream"} {
494+
listener.ParserType = parser
495+
496+
// post single message to listener
497+
resp, err := http.Post(createURL(listener, "http", "/api/v2/write", "bucket=mybucket"), "", bytes.NewBuffer([]byte(emptyMsg)))
498+
require.NoError(t, err)
499+
require.NoError(t, resp.Body.Close())
500+
require.EqualValues(t, 204, resp.StatusCode)
501+
}
475502
}
476503

477504
func TestReady(t *testing.T) {
@@ -504,17 +531,20 @@ func TestWriteWithPrecision(t *testing.T) {
504531
defer listener.Stop()
505532

506533
msg := "xyzzy value=42 1422568543\n"
507-
resp, err := http.Post(
508-
createURL(listener, "http", "/api/v2/write", "bucket=mybucket&precision=s"), "", bytes.NewBuffer([]byte(msg)))
509-
require.NoError(t, err)
510-
require.NoError(t, resp.Body.Close())
511-
require.EqualValues(t, 204, resp.StatusCode)
512-
513-
acc.Wait(1)
514-
require.Equal(t, 1, len(acc.Metrics))
515-
// When timestamp is provided, the precision parameter is
516-
// overloaded to specify the timestamp's unit
517-
require.Equal(t, time.Unix(0, 1422568543000000000), acc.Metrics[0].Time)
534+
for _, parser := range []string{"internal", "upstream"} {
535+
listener.ParserType = parser
536+
537+
resp, err := http.Post(
538+
createURL(listener, "http", "/api/v2/write", "bucket=mybucket&precision=s"), "", bytes.NewBuffer([]byte(msg)))
539+
require.NoError(t, err)
540+
require.NoError(t, resp.Body.Close())
541+
require.EqualValues(t, 204, resp.StatusCode)
542+
543+
acc.Wait(1)
544+
// When timestamp is provided, the precision parameter is
545+
// overloaded to specify the timestamp's unit
546+
require.Equal(t, time.Unix(0, 1422568543000000000), acc.Metrics[0].Time)
547+
}
518548
}
519549

520550
func TestWriteWithPrecisionNoTimestamp(t *testing.T) {

plugins/parsers/influx/README.md

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,8 @@
1-
# InfluxDB Line Protocol
1+
# Influx Line Protocol
22

3-
There are no additional configuration options for InfluxDB [line protocol][]. The
4-
metrics are parsed directly into Telegraf metrics.
3+
Parses metrics using the [Influx Line Protocol][].
54

6-
[line protocol]: https://docs.influxdata.com/influxdb/latest/reference/syntax/line-protocol/
5+
[Influx Line Protocol]: https://docs.influxdata.com/influxdb/latest/reference/syntax/line-protocol/
76

87
## Configuration
98

@@ -16,4 +15,10 @@ metrics are parsed directly into Telegraf metrics.
1615
## more about them here:
1716
## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_INPUT.md
1817
data_format = "influx"
18+
19+
## Influx parser type to use. Users can choose between 'internal' and
20+
## 'upstream'. The internal parser is what Telegraf has historically used.
21+
## While the upstream parser involved a large re-write to make it more
22+
## memory efficient and performant.
23+
## influx_parser_version = "internal"
1924
```

plugins/parsers/influx/handler.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,13 +7,14 @@ import (
77
"time"
88

99
"github.com/influxdata/telegraf"
10+
"github.com/influxdata/telegraf/internal"
1011
"github.com/influxdata/telegraf/metric"
1112
)
1213

1314
// MetricHandler implements the Handler interface and produces telegraf.Metric.
1415
type MetricHandler struct {
1516
timePrecision time.Duration
16-
timeFunc TimeFunc
17+
timeFunc internal.TimeFunc
1718
metric telegraf.Metric
1819
}
1920

@@ -34,7 +35,7 @@ func (h *MetricHandler) SetTimePrecision(p time.Duration) {
3435
// overloaded to hold the unit of measurement of the timestamp.
3536
}
3637

37-
func (h *MetricHandler) SetTimeFunc(f TimeFunc) {
38+
func (h *MetricHandler) SetTimeFunc(f internal.TimeFunc) {
3839
h.timeFunc = f
3940
}
4041

plugins/parsers/influx/parser.go

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ import (
99
"time"
1010

1111
"github.com/influxdata/telegraf"
12+
"github.com/influxdata/telegraf/internal"
1213
)
1314

1415
const (
@@ -19,8 +20,6 @@ var (
1920
ErrNoMetric = errors.New("no metric in line")
2021
)
2122

22-
type TimeFunc func() time.Time
23-
2423
// ParseError indicates a error in the parsing of the text.
2524
type ParseError struct {
2625
Offset int
@@ -82,7 +81,7 @@ func NewSeriesParser(handler *MetricHandler) *Parser {
8281
}
8382
}
8483

85-
func (p *Parser) SetTimeFunc(f TimeFunc) {
84+
func (p *Parser) SetTimeFunc(f internal.TimeFunc) {
8685
p.handler.SetTimeFunc(f)
8786
}
8887

@@ -178,7 +177,7 @@ func NewStreamParser(r io.Reader) *StreamParser {
178177
// SetTimeFunc changes the function used to determine the time of metrics
179178
// without a timestamp. The default TimeFunc is time.Now. Useful mostly for
180179
// testing, or perhaps if you want all metrics to have the same timestamp.
181-
func (sp *StreamParser) SetTimeFunc(f TimeFunc) {
180+
func (sp *StreamParser) SetTimeFunc(f internal.TimeFunc) {
182181
sp.handler.SetTimeFunc(f)
183182
}
184183

0 commit comments

Comments
 (0)