|
| 1 | +package prometheusremotewrite |
| 2 | + |
| 3 | +import ( |
| 4 | + "fmt" |
| 5 | + "math" |
| 6 | + "net/http" |
| 7 | + "time" |
| 8 | + |
| 9 | + "github.com/influxdata/telegraf" |
| 10 | + "github.com/influxdata/telegraf/metric" |
| 11 | + |
| 12 | + "github.com/prometheus/prometheus/prompb" |
| 13 | + "github.com/gogo/protobuf/proto" |
| 14 | + "github.com/prometheus/common/model" |
| 15 | +) |
| 16 | + |
| 17 | +type Parser struct { |
| 18 | + DefaultTags map[string]string |
| 19 | + Header http.Header |
| 20 | +} |
| 21 | + |
| 22 | +func (p *Parser) Parse(buf []byte) ([]telegraf.Metric, error) { |
| 23 | + var metrics []telegraf.Metric |
| 24 | + var err error |
| 25 | + var req prompb.WriteRequest |
| 26 | + |
| 27 | + |
| 28 | + if err := proto.Unmarshal(buf, &req); err != nil { |
| 29 | + return nil, fmt.Errorf("unable to unmarshal request body: %s", err) |
| 30 | + } |
| 31 | + |
| 32 | + |
| 33 | + now := time.Now() |
| 34 | + |
| 35 | + for _, ts := range req.Timeseries { |
| 36 | + tags := map[string]string{} |
| 37 | + for key, value := range p.DefaultTags { |
| 38 | + tags[key] = value |
| 39 | + } |
| 40 | + |
| 41 | + for _, l := range ts.Labels { |
| 42 | + tags[l.Name] = l.Value |
| 43 | + } |
| 44 | + |
| 45 | + metricName := tags[model.MetricNameLabel] |
| 46 | + delete(tags, model.MetricNameLabel) |
| 47 | + |
| 48 | + for _, s := range ts.Samples { |
| 49 | + fields := make(map[string]interface{}) |
| 50 | + fields = getNameAndValue(&s, metricName) |
| 51 | + |
| 52 | + // converting to telegraf metric |
| 53 | + if len(fields) > 0 { |
| 54 | + t := getTimestamp(&s, now) |
| 55 | + metric, err := metric.New("prometheusremotewrite", tags, fields, t) |
| 56 | + if err == nil { |
| 57 | + metrics = append(metrics, metric) |
| 58 | + } |
| 59 | + } |
| 60 | + } |
| 61 | + } |
| 62 | + |
| 63 | + return metrics, err |
| 64 | +} |
| 65 | + |
| 66 | +func (p *Parser) ParseLine(line string) (telegraf.Metric, error) { |
| 67 | + metrics, err := p.Parse([]byte(line)) |
| 68 | + if err != nil { |
| 69 | + return nil, err |
| 70 | + } |
| 71 | + |
| 72 | + if len(metrics) < 1 { |
| 73 | + return nil, fmt.Errorf("No metrics in line") |
| 74 | + } |
| 75 | + |
| 76 | + if len(metrics) > 1 { |
| 77 | + return nil, fmt.Errorf("More than one metric in line") |
| 78 | + } |
| 79 | + |
| 80 | + return metrics[0], nil |
| 81 | +} |
| 82 | + |
| 83 | +func (p *Parser) SetDefaultTags(tags map[string]string) { |
| 84 | + p.DefaultTags = tags |
| 85 | +} |
| 86 | + |
| 87 | +// Get name and value from metric |
| 88 | +func getNameAndValue(s *prompb.Sample, metricName string) map[string]interface{} { |
| 89 | + fields := make(map[string]interface{}) |
| 90 | + if !math.IsNaN(s.Value) { |
| 91 | + fields[metricName] = s.Value |
| 92 | + } |
| 93 | + return fields |
| 94 | +} |
| 95 | + |
| 96 | +func getTimestamp(s *prompb.Sample, now time.Time) time.Time { |
| 97 | + var t time.Time |
| 98 | + if s.Timestamp > 0 { |
| 99 | + t = time.Unix(0, s.Timestamp*1000000) |
| 100 | + } else { |
| 101 | + t = now |
| 102 | + } |
| 103 | + return t |
| 104 | +} |
0 commit comments