Skip to content

[Filebeat] aws-s3 input visibility timeout extensions lead to deadlocked ACKHandlers #25750

@andrewkroh

Description

@andrewkroh

The error channel used with the aws-s3 input in not properly handled leading to deadlocked channel sends. A goroutine is started to receive from the unbuffered error channel, but this goroutine can stop while the error channel is still in use. When this happens the next sender gets blocked because the receiver has already exited. The receiver is processKeepAlive and the sender is an ACKHandler.

Eventually you will reach a state where the queue.mem is full, but no events are being removed since the ACKHandlers are blocked. These deadlocked ACKHandlers are unrecoverable. With the output down, in the metrics slowly active will increase, but acked remains at 12.

    "pipeline": {
      "clients": 1,
      "events": {
        "active": 22,
        "dropped": 0,
        "failed": 0,
        "filtered": 0,
        "published": 34,
        "retry": 144,
        "total": 34
      },
      "queue": {
        "acked": 12,
        "max_events": 128
      }
    }

Here's a breakdown of a goroutine dump.

processMessage is blocked on publishing.

goroutine 758 [select, 48 minutes]:
github.com/elastic/beats/v7/libbeat/publisher/queue/memqueue.(*openState).publish(0xc00060c528, 0xf6f2430, 0xed834f0c1, 0x0, 0xc0005edc80, 0xc0005edcb0, 0x10cb57680, 0xc00059a4e0, 0x0, 0x1, ...)
	/Users/akroh/go/src/github.com/elastic/beats/libbeat/publisher/queue/memqueue/produce.go:133 +0xe5
github.com/elastic/beats/v7/libbeat/publisher/queue/memqueue.(*ackProducer).Publish(0xc00060c500, 0xf6f2430, 0xed834f0c1, 0x0, 0xc0005edc80, 0xc0005edcb0, 0x10cb57680, 0xc00059a4e0, 0x0, 0x1, ...)
	/Users/akroh/go/src/github.com/elastic/beats/libbeat/publisher/queue/memqueue/produce.go:88 +0x125
github.com/elastic/beats/v7/libbeat/publisher/pipeline.(*client).publish(0xc000a12480, 0xf6f2430, 0xed834f0c1, 0x0, 0xc0005edc80, 0xc0005ed9e0, 0x10cb57680, 0xc00059a4e0, 0x0)
	/Users/akroh/go/src/github.com/elastic/beats/libbeat/publisher/pipeline/client.go:134 +0x3bd
github.com/elastic/beats/v7/libbeat/publisher/pipeline.(*client).Publish(0xc000a12480, 0xf6f2430, 0xed834f0c1, 0x0, 0xc0005edc80, 0xc0005ed9e0, 0x10cb57680, 0xc00059a4e0, 0x0)
	/Users/akroh/go/src/github.com/elastic/beats/libbeat/publisher/pipeline/client.go:80 +0x9d
github.com/elastic/beats/v7/filebeat/beater.(*countingClient).Publish(0xc0004a0f20, 0xf6f2430, 0xed834f0c1, 0x0, 0xc0005edc80, 0xc0005ed9e0, 0x10cb57680, 0xc00059a4e0, 0x0)
	/Users/akroh/go/src/github.com/elastic/beats/filebeat/beater/channels.go:136 +0x7c
github.com/elastic/beats/v7/x-pack/filebeat/input/awss3.(*s3Collector).forwardEvent(0xc000a2e000, 0xf6f2430, 0xed834f0c1, 0x0, 0xc0005edc80, 0xc0005ed9e0, 0x10cb57680, 0xc00059a4e0, 0x0, 0xc000cc1cc0, ...)
	/Users/akroh/go/src/github.com/elastic/beats/x-pack/filebeat/input/awss3/collector.go:590 +0x66
github.com/elastic/beats/v7/x-pack/filebeat/input/awss3.(*s3Collector).convertJSONToEvent(0xc000a2e000, 0x10caa5580, 0xc00059bb30, 0x13ef, 0xc000354200, 0xa, 0xc000b406a0, 0x8, 0xc00080aa20, 0x89, ...)
	/Users/akroh/go/src/github.com/elastic/beats/x-pack/filebeat/input/awss3/collector.go:581 +0x1f8
github.com/elastic/beats/v7/x-pack/filebeat/input/awss3.(*s3Collector).jsonFieldsType(0xc000a2e000, 0x10caa5580, 0xc00059b170, 0x1026, 0xc000354200, 0xa, 0xc000b406a0, 0x8, 0xc00080aa20, 0x89, ...)
	/Users/akroh/go/src/github.com/elastic/beats/x-pack/filebeat/input/awss3/collector.go:551 +0x191
github.com/elastic/beats/v7/x-pack/filebeat/input/awss3.(*s3Collector).decodeJSON(0xc000a2e000, 0xc000bd06e0, 0xc000354200, 0xa, 0xc000b406a0, 0x8, 0xc00080aa20, 0x89, 0xc000b40696, 0x9, ...)
	/Users/akroh/go/src/github.com/elastic/beats/x-pack/filebeat/input/awss3/collector.go:513 +0x14f
github.com/elastic/beats/v7/x-pack/filebeat/input/awss3.(*s3Collector).createEventsFromS3Info(0xc000a2e000, 0x10d5ab5e0, 0xc000a244f0, 0xc000b406a0, 0x8, 0xc00080aa20, 0x89, 0xc000b40696, 0x9, 0xc000cc1cc0, ...)
	/Users/akroh/go/src/github.com/elastic/beats/x-pack/filebeat/input/awss3/collector.go:436 +0xc88
github.com/elastic/beats/v7/x-pack/filebeat/input/awss3.(*s3Collector).handleS3Objects(0xc000a2e000, 0x10d5ab5e0, 0xc000a244f0, 0xc00017e880, 0x1, 0x1, 0xc000118ea0, 0x0, 0x0)
	/Users/akroh/go/src/github.com/elastic/beats/x-pack/filebeat/input/awss3/collector.go:363 +0x291
github.com/elastic/beats/v7/x-pack/filebeat/input/awss3.(*s3Collector).processMessage(0xc000a2e000, 0x10d5ab5e0, 0xc000a244f0, 0x0, 0xc0009c6450, 0xc0009c6480, 0x0, 0x0, 0xc0009c64b0, 0xc0009c64e0, ...)
	/Users/akroh/go/src/github.com/elastic/beats/x-pack/filebeat/input/awss3/collector.go:161 +0x2d4
github.com/elastic/beats/v7/x-pack/filebeat/input/awss3.(*s3Collector).processor.func1(0x0, 0xc00041e7c0)
	/Users/akroh/go/src/github.com/elastic/beats/x-pack/filebeat/input/awss3/collector.go:139 +0xdd
github.com/elastic/go-concert/unison.(*MultiErrGroup).Go.func1(0xc00059a3f0, 0xc00060dae0)
	/Users/akroh/go/pkg/mod/github.com/elastic/go-concert@v0.1.0/unison/multierrgroup.go:42 +0x64
created by github.com/elastic/go-concert/unison.(*MultiErrGroup).Go
	/Users/akroh/go/pkg/mod/github.com/elastic/go-concert@v0.1.0/unison/multierrgroup.go:40 +0x66

The SQS reader thread is blocked waiting on processMessage to finish.

goroutine 59 [semacquire, 48 minutes]:
sync.runtime_Semacquire(0xc00059a418)
	/Users/akroh/.gvm/versions/go1.15.7.darwin.amd64/src/runtime/sema.go:56 +0x45
sync.(*WaitGroup).Wait(0xc00059a410)
	/Users/akroh/.gvm/versions/go1.15.7.darwin.amd64/src/sync/waitgroup.go:130 +0x65
github.com/elastic/go-concert/unison.(*MultiErrGroup).Wait(0xc00059a3f0, 0x0, 0x0, 0x0)
	/Users/akroh/go/pkg/mod/github.com/elastic/go-concert@v0.1.0/unison/multierrgroup.go:54 +0x53
github.com/elastic/beats/v7/x-pack/filebeat/input/awss3.(*s3Collector).processor(0xc000a2e000, 0xc0004de230, 0x4a, 0xc0005f7a80, 0x1, 0x1, 0x78, 0x10d5ab5e0, 0xc000a244f0, 0x10d586ea0, ...)
	/Users/akroh/go/src/github.com/elastic/beats/x-pack/filebeat/input/awss3/collector.go:149 +0x47b
github.com/elastic/beats/v7/x-pack/filebeat/input/awss3.(*s3Collector).run(0xc000a2e000)
	/Users/akroh/go/src/github.com/elastic/beats/x-pack/filebeat/input/awss3/collector.go:123 +0x2a5
github.com/elastic/beats/v7/x-pack/filebeat/input/awss3.(*s3Input).Run(0xc000a16d80, 0xc000a24170, 0xc000a26260, 0x10, 0x10cfd27b1, 0x8, 0x10cfd27b1, 0x8, 0x10cfcb939, 0x5, ...)
	/Users/akroh/go/src/github.com/elastic/beats/x-pack/filebeat/input/awss3/input.go:72 +0x12b
github.com/elastic/beats/v7/filebeat/input/v2/compat.(*runner).Start.func1(0xc000a24170, 0x10cfcdb1e, 0x6, 0xc0002ce620)
	/Users/akroh/go/src/github.com/elastic/beats/filebeat/input/v2/compat/compat.go:112 +0x1e8
created by github.com/elastic/beats/v7/filebeat/input/v2/compat.(*runner).Start
	/Users/akroh/go/src/github.com/elastic/beats/filebeat/input/v2/compat/compat.go:110 +0x7d

ACKs are blocked.

goroutine 90 [chan send, 1 minutes]:
github.com/elastic/beats/v7/x-pack/filebeat/input/awss3.(*s3Context).done(0xc0009e2b40)
	/Users/akroh/go/src/github.com/elastic/beats/x-pack/filebeat/input/awss3/collector.go:683 +0xa5
github.com/elastic/beats/v7/x-pack/filebeat/input/awss3.newACKHandler.func1(0xe, 0xc000a8d000, 0xe, 0x100)
	/Users/akroh/go/src/github.com/elastic/beats/x-pack/filebeat/input/awss3/input.go:137 +0x67
github.com/elastic/beats/v7/libbeat/common/acker.(*eventDataACKer).onACK(0xc00041e480, 0xe, 0xe)
	/Users/akroh/go/src/github.com/elastic/beats/libbeat/common/acker/acker.go:257 +0xd8
github.com/elastic/beats/v7/libbeat/common/acker.(*trackingACKer).ACKEvents(0xc000a222a0, 0xe)
	/Users/akroh/go/src/github.com/elastic/beats/libbeat/common/acker/acker.go:206 +0x262
github.com/elastic/beats/v7/libbeat/common/acker.(*clientOnlyACKer).ACKEvents(0xc0004a0e00, 0xe)
	/Users/akroh/go/src/github.com/elastic/beats/libbeat/common/acker/acker.go:329 +0x73
github.com/elastic/beats/v7/libbeat/common/acker.ackerList.ACKEvents(0xc0004a0e40, 0x2, 0x2, 0xe)
	/Users/akroh/go/src/github.com/elastic/beats/libbeat/common/acker/acker.go:294 +0x64
github.com/elastic/beats/v7/libbeat/publisher/queue/memqueue.(*bufferingEventLoop).processACK(0xc000787720, 0xc0014d8180, 0xc0014d8180, 0xe)
	/Users/akroh/go/src/github.com/elastic/beats/libbeat/publisher/queue/memqueue/eventloop.go:548 +0x22d
github.com/elastic/beats/v7/libbeat/publisher/queue/memqueue.(*ackLoop).handleBatchSig(0xc0002bf1d0, 0xc0005f3eb8)
	/Users/akroh/go/src/github.com/elastic/beats/libbeat/publisher/queue/memqueue/ackloop.go:120 +0x9f
github.com/elastic/beats/v7/libbeat/publisher/queue/memqueue.(*ackLoop).run(0xc0002bf1d0)
	/Users/akroh/go/src/github.com/elastic/beats/libbeat/publisher/queue/memqueue/ackloop.go:83 +0x18f
github.com/elastic/beats/v7/libbeat/publisher/queue/memqueue.NewQueue.func2(0xc0002c9ab0, 0xc0002bf1d0)
	/Users/akroh/go/src/github.com/elastic/beats/libbeat/publisher/queue/memqueue/broker.go:180 +0x59
created by github.com/elastic/beats/v7/libbeat/publisher/queue/memqueue.NewQueue
	/Users/akroh/go/src/github.com/elastic/beats/libbeat/publisher/queue/memqueue/broker.go:178 +0x3fe

For confirmed bugs, please report:

  • Version: 7.12.1 (testing with 5f242e3)
  • Operating System: All
  • Steps to Reproduce: Block the output or slowdown processing enough for visibility_timeout/2 to pass while processing an SQS message (that's 2.5 minute with defaults).

Metadata

Metadata

Assignees

Labels

Type

No type
No fields configured for issues without a type.

Projects

No projects

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions