Skip to content

Commit a3f26a6

Browse files
author
av.novikov1
committed
Implement Prometheus Remote Write Parser
1 parent b7fac16 commit a3f26a6

2 files changed

Lines changed: 113 additions & 0 deletions

File tree

Lines changed: 104 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,104 @@
1+
package prometheusremotewrite
2+
3+
import (
4+
"fmt"
5+
"math"
6+
"net/http"
7+
"time"
8+
9+
"github.com/influxdata/telegraf"
10+
"github.com/influxdata/telegraf/metric"
11+
12+
"github.com/prometheus/prometheus/prompb"
13+
"github.com/gogo/protobuf/proto"
14+
"github.com/prometheus/common/model"
15+
)
16+
17+
type Parser struct {
18+
DefaultTags map[string]string
19+
Header http.Header
20+
}
21+
22+
func (p *Parser) Parse(buf []byte) ([]telegraf.Metric, error) {
23+
var metrics []telegraf.Metric
24+
var err error
25+
var req prompb.WriteRequest
26+
27+
28+
if err := proto.Unmarshal(buf, &req); err != nil {
29+
return nil, fmt.Errorf("unable to unmarshal request body: %s", err)
30+
}
31+
32+
33+
now := time.Now()
34+
35+
for _, ts := range req.Timeseries {
36+
tags := map[string]string{}
37+
for key, value := range p.DefaultTags {
38+
tags[key] = value
39+
}
40+
41+
for _, l := range ts.Labels {
42+
tags[l.Name] = l.Value
43+
}
44+
45+
metricName := tags[model.MetricNameLabel]
46+
delete(tags, model.MetricNameLabel)
47+
48+
for _, s := range ts.Samples {
49+
fields := make(map[string]interface{})
50+
fields = getNameAndValue(&s, metricName)
51+
52+
// converting to telegraf metric
53+
if len(fields) > 0 {
54+
t := getTimestamp(&s, now)
55+
metric, err := metric.New("prometheusremotewrite", tags, fields, t)
56+
if err == nil {
57+
metrics = append(metrics, metric)
58+
}
59+
}
60+
}
61+
}
62+
63+
return metrics, err
64+
}
65+
66+
func (p *Parser) ParseLine(line string) (telegraf.Metric, error) {
67+
metrics, err := p.Parse([]byte(line))
68+
if err != nil {
69+
return nil, err
70+
}
71+
72+
if len(metrics) < 1 {
73+
return nil, fmt.Errorf("No metrics in line")
74+
}
75+
76+
if len(metrics) > 1 {
77+
return nil, fmt.Errorf("More than one metric in line")
78+
}
79+
80+
return metrics[0], nil
81+
}
82+
83+
func (p *Parser) SetDefaultTags(tags map[string]string) {
84+
p.DefaultTags = tags
85+
}
86+
87+
// Get name and value from metric
88+
func getNameAndValue(s *prompb.Sample, metricName string) map[string]interface{} {
89+
fields := make(map[string]interface{})
90+
if !math.IsNaN(s.Value) {
91+
fields[metricName] = s.Value
92+
}
93+
return fields
94+
}
95+
96+
func getTimestamp(s *prompb.Sample, now time.Time) time.Time {
97+
var t time.Time
98+
if s.Timestamp > 0 {
99+
t = time.Unix(0, s.Timestamp*1000000)
100+
} else {
101+
t = now
102+
}
103+
return t
104+
}

plugins/parsers/registry.go

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ import (
1515
"github.com/influxdata/telegraf/plugins/parsers/logfmt"
1616
"github.com/influxdata/telegraf/plugins/parsers/nagios"
1717
"github.com/influxdata/telegraf/plugins/parsers/prometheus"
18+
"github.com/influxdata/telegraf/plugins/parsers/prometheusremotewrite"
1819
"github.com/influxdata/telegraf/plugins/parsers/value"
1920
"github.com/influxdata/telegraf/plugins/parsers/wavefront"
2021
)
@@ -237,6 +238,8 @@ func NewParser(config *Config) (Parser, error) {
237238
)
238239
case "prometheus":
239240
parser, err = NewPrometheusParser(config.DefaultTags)
241+
case "prometheusremotewrite":
242+
parser, err = NewPrometheusRemoteWriteParser(config.DefaultTags)
240243
default:
241244
err = fmt.Errorf("Invalid data format: %s", config.DataFormat)
242245
}
@@ -350,3 +353,9 @@ func NewPrometheusParser(defaultTags map[string]string) (Parser, error) {
350353
DefaultTags: defaultTags,
351354
}, nil
352355
}
356+
357+
func NewPrometheusRemoteWriteParser(defaultTags map[string]string) (Parser, error) {
358+
return &prometheusremotewrite.Parser{
359+
DefaultTags: defaultTags,
360+
}, nil
361+
}

0 commit comments

Comments
 (0)