-
Notifications
You must be signed in to change notification settings - Fork 5.8k
auto reload config mechanism lead to memory and goroutine leak by redis input plugin #11128
Description
Redis input plugin used pool.ConnPool for communication when gathering and use the default connections configs: IdleTimeout=5m, IdleCheckFrequency=1m. And redis client(go-redis v6.15.9) will create a goroutine for checking connections in pool if idle time greater than IdleTimeout with IdleCheckFrequency frequency.
## internal/pool/pool.go
func NewConnPool(opt *Options) *ConnPool {
p := &ConnPool{
opt: opt,
queue: make(chan struct{}, opt.PoolSize),
conns: make([]*Conn, 0, opt.PoolSize),
idleConns: make([]*Conn, 0, opt.PoolSize),
}
for i := 0; i < opt.MinIdleConns; i++ {
p.checkMinIdleConns()
}
###### create a goroutine to check connections in pool if that has expired.
if opt.IdleTimeout > 0 && opt.IdleCheckFrequency > 0 {
go p.reaper(opt.IdleCheckFrequency)
}
return p
}
Telegraf transmit the cancelCtx to control the "gatherLoop" method of all input plugins. When config file was changed, telegraf will call the cancelCtx to cancel all the gather loop of input plugins and renew a agent instance. And new redis plugins instances also been recreated(new pool.ConnPool). But redis connection pool of previous redis input plugin instance had not been closed; the goroutines for checking if connections in pool has expired still alive. So leading to massive goroutine leak when config file changed frequently.
func (a *Agent) runInputs(...) {
... ...
acc := NewAccumulator(input, unit.dst)
acc.SetPrecision(getPrecision(precision, interval))
wg.Add(1)
go func(input *models.RunningInput) {
defer wg.Done()
####### transmit ctx for control
a.gatherLoop(ctx, acc, input, ticker, interval)
}(input)
... ...
}
// gather runs an input's gather function periodically until the context is
// done.
func (a *Agent) gatherLoop(
ctx context.Context,
acc telegraf.Accumulator,
input *models.RunningInput,
ticker Ticker,
interval time.Duration,
) {
defer panicRecover(input)
for {
select {
case <-ticker.Elapsed():
err := a.gatherOnce(acc, input, ticker, interval)
if err != nil {
acc.AddError(err)
}
case <-ctx.Done(): ####### here will get the notify through cancelCtx when cancel func has been called
return
}
}
}
pprof Peek
File: telegraf
Type: goroutine
Time: May 18, 2022 at 10:16am (CST)
Active filters:
focus=runtime\.chanrecv|runtime\.chanrecv2
Showing nodes accounting for 14988, 97.26% of 15410 total
----------------------------------------------------------+-------------
flat flat% sum% cum cum% calls calls% + context
----------------------------------------------------------+-------------
14987 100% | runtime.chanrecv2 /usr/go/src/runtime/chan.go:444
1 0.0067% | runtime.chanrecv1 /usr/go/src/runtime/chan.go:439
0 0% 0% 14988 97.26% | runtime.chanrecv /usr/go/src/runtime/chan.go:576
14988 100% | runtime.gopark /usr/go/src/runtime/proc.go:336
----------------------------------------------------------+-------------
1 100% | github.com/influxdata/telegraf/agent.(*AutoConfig).RunWithInterval /data/__qci/root-workspaces/__qci-pipeline-358598-1/agent/autoconfig.go:92
0 0% 0% 1 0.0065% | runtime.chanrecv1 /usr/go/src/runtime/chan.go:439
1 100% | runtime.chanrecv /usr/go/src/runtime/chan.go:576
----------------------------------------------------------+-------------
14986 100% | github.com/go-redis/redis/internal/pool.(*ConnPool).reaper /root/go/pkg/mod/github.com/go-redis/redis@v6.15.9+incompatible/internal/pool/pool.go:449
1 0.0067% | github.com/influxdata/telegraf/agent.(*Agent).runOutputs /data/__qci/root-workspaces/__qci-pipeline-358598-1/agent/agent.go:781
0 0% 0% 14987 97.26% | runtime.chanrecv2 /usr/go/src/runtime/chan.go:444
14987 100% | runtime.chanrecv /usr/go/src/runtime/chan.go:576
----------------------------------------------------------+-------------