Skip to content

Panic in http_listener_v2 when sending metrics via e.g. prometheus remote write and killing telegraf with os signal #10125

@pmalek-sumo

Description

@pmalek-sumo

Relevent telegraf.conf

[agent]
interval = "1s"
flush_interval = "1s"

[[inputs.http_listener_v2]]
# wait longer than prometheus
read_timeout = "30s"
write_timeout = "30s"
service_address = ":9888"
data_format = "prometheusremotewrite"
paths = [
"/prometheus.metrics",
]

[[outputs.file]]
  files = ["x"]
  data_format = "prometheus"

System info

Telegraf master HEAD : 146fff3, MacOS

Docker

No response

Steps to reproduce

  1. Run telegraf with attached config
  2. Run the following Go program which sends lots of metrics via Prometheus remote write
package main

import (
	"bytes"
	"fmt"
	"net/http"

	"github.com/gogo/protobuf/proto"
	"github.com/golang/snappy"
	"github.com/prometheus/prometheus/prompb"
)

func send(url string) {
	// Create the WriteRequest object with metric data populated.
	wr := &prompb.WriteRequest{
		Timeseries: []*prompb.TimeSeries{
			{
				Labels: []*prompb.Label{
					{
						Name:  "__name__",
						Value: "foo",
					},
				},
				Samples: []prompb.Sample{
					{
						Timestamp: 1577836800000,
						Value:     100.0,
					},
				},
			},
		},
	}

	// Marshal the data into a byte slice using the protobuf library.
	data, err := proto.Marshal(wr)
	if err != nil {
		panic(err)
	}

	// Encode the content into snappy encoding.
	encoded := snappy.Encode(nil, data)
	body := bytes.NewReader(encoded)

	// Create an HTTP request from the body content and set necessary parameters.
	req, err := http.NewRequest("POST", url, body)
	if err != nil {
		panic(err)
	}

	// Set the required HTTP header content.
	req.Header.Set("Content-Type", "application/x-protobuf")
	req.Header.Set("Content-Encoding", "snappy")
	req.Header.Set("X-Prometheus-Remote-Write-Version", "0.1.0")

	// Send request to Promscale.
	resp, err := http.DefaultClient.Do(req)
	if err != nil {
		panic(err)
	}

	defer resp.Body.Close()

}

func main() {
	fmt.Println("start!")
	for {
		send("http://localhost:9888/prometheus.metrics")
	}
}
  1. Kill telegraf

Expected behavior

No panic

Actual behavior

2021-11-18T15:37:42Z I! [inputs.http_listener_v2] Listening on [::]:9888
^C2021-11-18T15:37:49Z E! [inputs.http_listener_v2] Serve failed: accept tcp [::]:9888: use of closed network connection
2021-11-18T15:37:49Z I! [agent] Hang on, flushing any cached metrics before shutdown
2021-11-18T15:37:49Z I! http: panic serving [::1]:51796: send on closed channel
goroutine 23 [running]:
net/http.(*conn).serve.func1()
        /Users/pmalek/.gvm/gos/go1.17.3/src/net/http/server.go:1801 +0xb9
panic({0x7e03280, 0x8ef0500})
        /Users/pmalek/.gvm/gos/go1.17.3/src/runtime/panic.go:1047 +0x266
github.com/influxdata/telegraf/agent.(*accumulator).AddMetric(0xc000649320, {0x90d58a8, 0xc001039740})
        /Users/pmalek/code/telegraf/telegraf/agent/accumulator.go:88 +0xcb
github.com/influxdata/telegraf/plugins/inputs/http_listener_v2.(*HTTPListenerV2).serveWrite(0xc000783040, {0x8fda148, 0xc00015ba40}, 0xc000164700)
        /Users/pmalek/code/telegraf/telegraf/plugins/inputs/http_listener_v2/http_listener_v2.go:274 +0x531
github.com/influxdata/telegraf/plugins/inputs/http_listener_v2.(*HTTPListenerV2).authenticateIfSet(0xc000e54335, 0x13, {0x8fda148, 0xc00015ba40}, 0x406f160)
        /Users/pmalek/code/telegraf/telegraf/plugins/inputs/http_listener_v2/http_listener_v2.go:384 +0x11b
github.com/influxdata/telegraf/plugins/inputs/http_listener_v2.(*HTTPListenerV2).ServeHTTP(0xc000783040, {0x8fda148, 0xc00015ba40}, 0xc000164700)
        /Users/pmalek/code/telegraf/telegraf/plugins/inputs/http_listener_v2/http_listener_v2.go:212 +0x9b
net/http.serverHandler.ServeHTTP({0x8fc3140}, {0x8fda148, 0xc00015ba40}, 0xc000164700)
        /Users/pmalek/.gvm/gos/go1.17.3/src/net/http/server.go:2878 +0x43b
net/http.(*conn).serve(0xc00012e000, {0x902e530, 0xc0005d49c0})
        /Users/pmalek/.gvm/gos/go1.17.3/src/net/http/server.go:1929 +0xb08
created by net/http.(*Server).Serve
        /Users/pmalek/.gvm/gos/go1.17.3/src/net/http/server.go:3033 +0x4e8
2021-11-18T15:37:49Z I! [agent] Stopping running outputs

Additional info

As of now there is no simple way how to do this because Accumulator interface doesn't have

  • a method that allows to check wether metrics can be sent
  • a destructor which would close a channel which then could be used in order to make sending the metric on a channel conditional (via a select) in here:
    func (ac *accumulator) AddMetric(m telegraf.Metric) {
    m.SetTime(m.Time().Round(ac.precision))
    if m := ac.maker.MakeMetric(m); m != nil {
    ac.metrics <- m
    }
    }

Metadata

Metadata

Assignees

No one assigned

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions