Eventually you will reach a state where the queue.mem is full, but no events are being removed since the ACKHandlers are blocked. These deadlocked ACKHandlers are unrecoverable. With the output down, in the metrics slowly active will increase, but acked remains at 12.
"pipeline": {
"clients": 1,
"events": {
"active": 22,
"dropped": 0,
"failed": 0,
"filtered": 0,
"published": 34,
"retry": 144,
"total": 34
},
"queue": {
"acked": 12,
"max_events": 128
}
}
Here's a breakdown of a goroutine dump.
goroutine 758 [select, 48 minutes]:
github.com/elastic/beats/v7/libbeat/publisher/queue/memqueue.(*openState).publish(0xc00060c528, 0xf6f2430, 0xed834f0c1, 0x0, 0xc0005edc80, 0xc0005edcb0, 0x10cb57680, 0xc00059a4e0, 0x0, 0x1, ...)
/Users/akroh/go/src/github.com/elastic/beats/libbeat/publisher/queue/memqueue/produce.go:133 +0xe5
github.com/elastic/beats/v7/libbeat/publisher/queue/memqueue.(*ackProducer).Publish(0xc00060c500, 0xf6f2430, 0xed834f0c1, 0x0, 0xc0005edc80, 0xc0005edcb0, 0x10cb57680, 0xc00059a4e0, 0x0, 0x1, ...)
/Users/akroh/go/src/github.com/elastic/beats/libbeat/publisher/queue/memqueue/produce.go:88 +0x125
github.com/elastic/beats/v7/libbeat/publisher/pipeline.(*client).publish(0xc000a12480, 0xf6f2430, 0xed834f0c1, 0x0, 0xc0005edc80, 0xc0005ed9e0, 0x10cb57680, 0xc00059a4e0, 0x0)
/Users/akroh/go/src/github.com/elastic/beats/libbeat/publisher/pipeline/client.go:134 +0x3bd
github.com/elastic/beats/v7/libbeat/publisher/pipeline.(*client).Publish(0xc000a12480, 0xf6f2430, 0xed834f0c1, 0x0, 0xc0005edc80, 0xc0005ed9e0, 0x10cb57680, 0xc00059a4e0, 0x0)
/Users/akroh/go/src/github.com/elastic/beats/libbeat/publisher/pipeline/client.go:80 +0x9d
github.com/elastic/beats/v7/filebeat/beater.(*countingClient).Publish(0xc0004a0f20, 0xf6f2430, 0xed834f0c1, 0x0, 0xc0005edc80, 0xc0005ed9e0, 0x10cb57680, 0xc00059a4e0, 0x0)
/Users/akroh/go/src/github.com/elastic/beats/filebeat/beater/channels.go:136 +0x7c
github.com/elastic/beats/v7/x-pack/filebeat/input/awss3.(*s3Collector).forwardEvent(0xc000a2e000, 0xf6f2430, 0xed834f0c1, 0x0, 0xc0005edc80, 0xc0005ed9e0, 0x10cb57680, 0xc00059a4e0, 0x0, 0xc000cc1cc0, ...)
/Users/akroh/go/src/github.com/elastic/beats/x-pack/filebeat/input/awss3/collector.go:590 +0x66
github.com/elastic/beats/v7/x-pack/filebeat/input/awss3.(*s3Collector).convertJSONToEvent(0xc000a2e000, 0x10caa5580, 0xc00059bb30, 0x13ef, 0xc000354200, 0xa, 0xc000b406a0, 0x8, 0xc00080aa20, 0x89, ...)
/Users/akroh/go/src/github.com/elastic/beats/x-pack/filebeat/input/awss3/collector.go:581 +0x1f8
github.com/elastic/beats/v7/x-pack/filebeat/input/awss3.(*s3Collector).jsonFieldsType(0xc000a2e000, 0x10caa5580, 0xc00059b170, 0x1026, 0xc000354200, 0xa, 0xc000b406a0, 0x8, 0xc00080aa20, 0x89, ...)
/Users/akroh/go/src/github.com/elastic/beats/x-pack/filebeat/input/awss3/collector.go:551 +0x191
github.com/elastic/beats/v7/x-pack/filebeat/input/awss3.(*s3Collector).decodeJSON(0xc000a2e000, 0xc000bd06e0, 0xc000354200, 0xa, 0xc000b406a0, 0x8, 0xc00080aa20, 0x89, 0xc000b40696, 0x9, ...)
/Users/akroh/go/src/github.com/elastic/beats/x-pack/filebeat/input/awss3/collector.go:513 +0x14f
github.com/elastic/beats/v7/x-pack/filebeat/input/awss3.(*s3Collector).createEventsFromS3Info(0xc000a2e000, 0x10d5ab5e0, 0xc000a244f0, 0xc000b406a0, 0x8, 0xc00080aa20, 0x89, 0xc000b40696, 0x9, 0xc000cc1cc0, ...)
/Users/akroh/go/src/github.com/elastic/beats/x-pack/filebeat/input/awss3/collector.go:436 +0xc88
github.com/elastic/beats/v7/x-pack/filebeat/input/awss3.(*s3Collector).handleS3Objects(0xc000a2e000, 0x10d5ab5e0, 0xc000a244f0, 0xc00017e880, 0x1, 0x1, 0xc000118ea0, 0x0, 0x0)
/Users/akroh/go/src/github.com/elastic/beats/x-pack/filebeat/input/awss3/collector.go:363 +0x291
github.com/elastic/beats/v7/x-pack/filebeat/input/awss3.(*s3Collector).processMessage(0xc000a2e000, 0x10d5ab5e0, 0xc000a244f0, 0x0, 0xc0009c6450, 0xc0009c6480, 0x0, 0x0, 0xc0009c64b0, 0xc0009c64e0, ...)
/Users/akroh/go/src/github.com/elastic/beats/x-pack/filebeat/input/awss3/collector.go:161 +0x2d4
github.com/elastic/beats/v7/x-pack/filebeat/input/awss3.(*s3Collector).processor.func1(0x0, 0xc00041e7c0)
/Users/akroh/go/src/github.com/elastic/beats/x-pack/filebeat/input/awss3/collector.go:139 +0xdd
github.com/elastic/go-concert/unison.(*MultiErrGroup).Go.func1(0xc00059a3f0, 0xc00060dae0)
/Users/akroh/go/pkg/mod/github.com/elastic/go-concert@v0.1.0/unison/multierrgroup.go:42 +0x64
created by github.com/elastic/go-concert/unison.(*MultiErrGroup).Go
/Users/akroh/go/pkg/mod/github.com/elastic/go-concert@v0.1.0/unison/multierrgroup.go:40 +0x66
goroutine 59 [semacquire, 48 minutes]:
sync.runtime_Semacquire(0xc00059a418)
/Users/akroh/.gvm/versions/go1.15.7.darwin.amd64/src/runtime/sema.go:56 +0x45
sync.(*WaitGroup).Wait(0xc00059a410)
/Users/akroh/.gvm/versions/go1.15.7.darwin.amd64/src/sync/waitgroup.go:130 +0x65
github.com/elastic/go-concert/unison.(*MultiErrGroup).Wait(0xc00059a3f0, 0x0, 0x0, 0x0)
/Users/akroh/go/pkg/mod/github.com/elastic/go-concert@v0.1.0/unison/multierrgroup.go:54 +0x53
github.com/elastic/beats/v7/x-pack/filebeat/input/awss3.(*s3Collector).processor(0xc000a2e000, 0xc0004de230, 0x4a, 0xc0005f7a80, 0x1, 0x1, 0x78, 0x10d5ab5e0, 0xc000a244f0, 0x10d586ea0, ...)
/Users/akroh/go/src/github.com/elastic/beats/x-pack/filebeat/input/awss3/collector.go:149 +0x47b
github.com/elastic/beats/v7/x-pack/filebeat/input/awss3.(*s3Collector).run(0xc000a2e000)
/Users/akroh/go/src/github.com/elastic/beats/x-pack/filebeat/input/awss3/collector.go:123 +0x2a5
github.com/elastic/beats/v7/x-pack/filebeat/input/awss3.(*s3Input).Run(0xc000a16d80, 0xc000a24170, 0xc000a26260, 0x10, 0x10cfd27b1, 0x8, 0x10cfd27b1, 0x8, 0x10cfcb939, 0x5, ...)
/Users/akroh/go/src/github.com/elastic/beats/x-pack/filebeat/input/awss3/input.go:72 +0x12b
github.com/elastic/beats/v7/filebeat/input/v2/compat.(*runner).Start.func1(0xc000a24170, 0x10cfcdb1e, 0x6, 0xc0002ce620)
/Users/akroh/go/src/github.com/elastic/beats/filebeat/input/v2/compat/compat.go:112 +0x1e8
created by github.com/elastic/beats/v7/filebeat/input/v2/compat.(*runner).Start
/Users/akroh/go/src/github.com/elastic/beats/filebeat/input/v2/compat/compat.go:110 +0x7d
ACKs are blocked.
goroutine 90 [chan send, 1 minutes]:
github.com/elastic/beats/v7/x-pack/filebeat/input/awss3.(*s3Context).done(0xc0009e2b40)
/Users/akroh/go/src/github.com/elastic/beats/x-pack/filebeat/input/awss3/collector.go:683 +0xa5
github.com/elastic/beats/v7/x-pack/filebeat/input/awss3.newACKHandler.func1(0xe, 0xc000a8d000, 0xe, 0x100)
/Users/akroh/go/src/github.com/elastic/beats/x-pack/filebeat/input/awss3/input.go:137 +0x67
github.com/elastic/beats/v7/libbeat/common/acker.(*eventDataACKer).onACK(0xc00041e480, 0xe, 0xe)
/Users/akroh/go/src/github.com/elastic/beats/libbeat/common/acker/acker.go:257 +0xd8
github.com/elastic/beats/v7/libbeat/common/acker.(*trackingACKer).ACKEvents(0xc000a222a0, 0xe)
/Users/akroh/go/src/github.com/elastic/beats/libbeat/common/acker/acker.go:206 +0x262
github.com/elastic/beats/v7/libbeat/common/acker.(*clientOnlyACKer).ACKEvents(0xc0004a0e00, 0xe)
/Users/akroh/go/src/github.com/elastic/beats/libbeat/common/acker/acker.go:329 +0x73
github.com/elastic/beats/v7/libbeat/common/acker.ackerList.ACKEvents(0xc0004a0e40, 0x2, 0x2, 0xe)
/Users/akroh/go/src/github.com/elastic/beats/libbeat/common/acker/acker.go:294 +0x64
github.com/elastic/beats/v7/libbeat/publisher/queue/memqueue.(*bufferingEventLoop).processACK(0xc000787720, 0xc0014d8180, 0xc0014d8180, 0xe)
/Users/akroh/go/src/github.com/elastic/beats/libbeat/publisher/queue/memqueue/eventloop.go:548 +0x22d
github.com/elastic/beats/v7/libbeat/publisher/queue/memqueue.(*ackLoop).handleBatchSig(0xc0002bf1d0, 0xc0005f3eb8)
/Users/akroh/go/src/github.com/elastic/beats/libbeat/publisher/queue/memqueue/ackloop.go:120 +0x9f
github.com/elastic/beats/v7/libbeat/publisher/queue/memqueue.(*ackLoop).run(0xc0002bf1d0)
/Users/akroh/go/src/github.com/elastic/beats/libbeat/publisher/queue/memqueue/ackloop.go:83 +0x18f
github.com/elastic/beats/v7/libbeat/publisher/queue/memqueue.NewQueue.func2(0xc0002c9ab0, 0xc0002bf1d0)
/Users/akroh/go/src/github.com/elastic/beats/libbeat/publisher/queue/memqueue/broker.go:180 +0x59
created by github.com/elastic/beats/v7/libbeat/publisher/queue/memqueue.NewQueue
/Users/akroh/go/src/github.com/elastic/beats/libbeat/publisher/queue/memqueue/broker.go:178 +0x3fe
The
errorchannel used with the aws-s3 input in not properly handled leading to deadlocked channel sends. A goroutine is started to receive from the unbuffered error channel, but this goroutine can stop while the error channel is still in use. When this happens the next sender gets blocked because the receiver has already exited. The receiver is processKeepAlive and the sender is an ACKHandler.Eventually you will reach a state where the queue.mem is full, but no events are being removed since the ACKHandlers are blocked. These deadlocked ACKHandlers are unrecoverable. With the output down, in the metrics slowly
activewill increase, butackedremains at 12.Here's a breakdown of a goroutine dump.
processMessageis blocked on publishing.The SQS reader thread is blocked waiting on
processMessageto finish.ACKs are blocked.
For confirmed bugs, please report:
visibility_timeout/2to pass while processing an SQS message (that's 2.5 minute with defaults).