Relevent telegraf.conf
[agent]
interval = "1s"
flush_interval = "1s"
[[inputs.http_listener_v2]]
# wait longer than prometheus
read_timeout = "30s"
write_timeout = "30s"
service_address = ":9888"
data_format = "prometheusremotewrite"
paths = [
"/prometheus.metrics",
]
[[outputs.file]]
files = ["x"]
data_format = "prometheus"
System info
Telegraf master HEAD : 146fff3, MacOS
Docker
No response
Steps to reproduce
- Run telegraf with attached config
- Run the following Go program which sends lots of metrics via Prometheus remote write
package main
import (
"bytes"
"fmt"
"net/http"
"github.com/gogo/protobuf/proto"
"github.com/golang/snappy"
"github.com/prometheus/prometheus/prompb"
)
func send(url string) {
// Create the WriteRequest object with metric data populated.
wr := &prompb.WriteRequest{
Timeseries: []*prompb.TimeSeries{
{
Labels: []*prompb.Label{
{
Name: "__name__",
Value: "foo",
},
},
Samples: []prompb.Sample{
{
Timestamp: 1577836800000,
Value: 100.0,
},
},
},
},
}
// Marshal the data into a byte slice using the protobuf library.
data, err := proto.Marshal(wr)
if err != nil {
panic(err)
}
// Encode the content into snappy encoding.
encoded := snappy.Encode(nil, data)
body := bytes.NewReader(encoded)
// Create an HTTP request from the body content and set necessary parameters.
req, err := http.NewRequest("POST", url, body)
if err != nil {
panic(err)
}
// Set the required HTTP header content.
req.Header.Set("Content-Type", "application/x-protobuf")
req.Header.Set("Content-Encoding", "snappy")
req.Header.Set("X-Prometheus-Remote-Write-Version", "0.1.0")
// Send request to Promscale.
resp, err := http.DefaultClient.Do(req)
if err != nil {
panic(err)
}
defer resp.Body.Close()
}
func main() {
fmt.Println("start!")
for {
send("http://localhost:9888/prometheus.metrics")
}
}
- Kill telegraf
Expected behavior
No panic
Actual behavior
2021-11-18T15:37:42Z I! [inputs.http_listener_v2] Listening on [::]:9888
^C2021-11-18T15:37:49Z E! [inputs.http_listener_v2] Serve failed: accept tcp [::]:9888: use of closed network connection
2021-11-18T15:37:49Z I! [agent] Hang on, flushing any cached metrics before shutdown
2021-11-18T15:37:49Z I! http: panic serving [::1]:51796: send on closed channel
goroutine 23 [running]:
net/http.(*conn).serve.func1()
/Users/pmalek/.gvm/gos/go1.17.3/src/net/http/server.go:1801 +0xb9
panic({0x7e03280, 0x8ef0500})
/Users/pmalek/.gvm/gos/go1.17.3/src/runtime/panic.go:1047 +0x266
github.com/influxdata/telegraf/agent.(*accumulator).AddMetric(0xc000649320, {0x90d58a8, 0xc001039740})
/Users/pmalek/code/telegraf/telegraf/agent/accumulator.go:88 +0xcb
github.com/influxdata/telegraf/plugins/inputs/http_listener_v2.(*HTTPListenerV2).serveWrite(0xc000783040, {0x8fda148, 0xc00015ba40}, 0xc000164700)
/Users/pmalek/code/telegraf/telegraf/plugins/inputs/http_listener_v2/http_listener_v2.go:274 +0x531
github.com/influxdata/telegraf/plugins/inputs/http_listener_v2.(*HTTPListenerV2).authenticateIfSet(0xc000e54335, 0x13, {0x8fda148, 0xc00015ba40}, 0x406f160)
/Users/pmalek/code/telegraf/telegraf/plugins/inputs/http_listener_v2/http_listener_v2.go:384 +0x11b
github.com/influxdata/telegraf/plugins/inputs/http_listener_v2.(*HTTPListenerV2).ServeHTTP(0xc000783040, {0x8fda148, 0xc00015ba40}, 0xc000164700)
/Users/pmalek/code/telegraf/telegraf/plugins/inputs/http_listener_v2/http_listener_v2.go:212 +0x9b
net/http.serverHandler.ServeHTTP({0x8fc3140}, {0x8fda148, 0xc00015ba40}, 0xc000164700)
/Users/pmalek/.gvm/gos/go1.17.3/src/net/http/server.go:2878 +0x43b
net/http.(*conn).serve(0xc00012e000, {0x902e530, 0xc0005d49c0})
/Users/pmalek/.gvm/gos/go1.17.3/src/net/http/server.go:1929 +0xb08
created by net/http.(*Server).Serve
/Users/pmalek/.gvm/gos/go1.17.3/src/net/http/server.go:3033 +0x4e8
2021-11-18T15:37:49Z I! [agent] Stopping running outputs
Additional info
As of now there is no simple way how to do this because Accumulator interface doesn't have
- a method that allows to check wether metrics can be sent
- a destructor which would close a channel which then could be used in order to make sending the metric on a channel conditional (via a
select) in here:
|
func (ac *accumulator) AddMetric(m telegraf.Metric) { |
|
m.SetTime(m.Time().Round(ac.precision)) |
|
if m := ac.maker.MakeMetric(m); m != nil { |
|
ac.metrics <- m |
|
} |
|
} |
Relevent telegraf.conf
System info
Telegraf master HEAD : 146fff3, MacOS
Docker
No response
Steps to reproduce
Expected behavior
No panic
Actual behavior
Additional info
As of now there is no simple way how to do this because
Accumulatorinterface doesn't haveselect) in here:telegraf/agent/accumulator.go
Lines 79 to 84 in 842a788