Skip to content

Commit 9653105

Browse files
authored
Fix memory leak in Filebeat pipeline acker (#12063)
* Fix memory leak in Filebeat pipeline acker Before this change acker goroutine was kept forever as processed events count was not correctly updated. Filebeat sends an empty event to update file states, this event is not published, but treated as dropped, without updating counters. This change makes sures that `a.events` count gets updated for dropped events also, so the acker gets closed after all ACKs happen.
1 parent 82e7eec commit 9653105

2 files changed

Lines changed: 9 additions & 1 deletion

File tree

CHANGELOG.next.asciidoc

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -87,6 +87,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d
8787
- Fix `add_docker_metadata` source matching, using `log.file.path` field now. {pull}11577[11577]
8888
- Add missing Kubernetes metadata fields to Filebeat CoreDNS module, and fix a documentation error. {pull}11591[11591]
8989
- Reduce memory usage if long lines are truncated to fit `max_bytes` limit. The line buffer is copied into a smaller buffer now. This allows the runtime to release unused memory earlier. {pull}11524[11524]
90+
- Fix memory leak in Filebeat pipeline acker. {pull}12063[12063]
9091

9192
*Heartbeat*
9293

libbeat/publisher/pipeline/acker.go

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -139,19 +139,26 @@ func (a *gapCountACK) ackLoop() {
139139
case <-a.done:
140140
closing = true
141141
a.done = nil
142+
if a.events.Load() == 0 {
143+
// stop worker, if all events accounted for have been ACKed.
144+
// If new events are added after this acker won't handle them, which may
145+
// result in duplicates
146+
return
147+
}
142148

143149
case <-a.pipeline.ackDone:
144150
return
145151

146152
case n := <-acks:
147153
empty := a.handleACK(n)
148154
if empty && closing && a.events.Load() == 0 {
149-
// stop worker, iff all events accounted for have been ACKed
155+
// stop worker, if and only if all events accounted for have been ACKed
150156
return
151157
}
152158

153159
case <-drop:
154160
// TODO: accumulate multiple drop events + flush count with timer
161+
a.events.Sub(1)
155162
a.fn(1, 0)
156163
}
157164
}

0 commit comments

Comments
 (0)