Skip to content

Commit 57c9891

Browse files
authored
Stop waiting for signals on closed outleters (#11263)
Outleters start a goroutine to handle the finalization of filebeat. If the outleter is closed by other means the goroutine will be kept running even if it has nothing to do, leaking goroutines. Stop this goroutine if the outleter is closed.
1 parent 57f8b5c commit 57c9891

5 files changed

Lines changed: 20 additions & 2 deletions

File tree

CHANGELOG.next.asciidoc

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -170,6 +170,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d
170170
- Fix errors in filebeat Zeek dashboard and README files. Add notice.log support. {pull}10916[10916]
171171
- Fix a bug when converting NetFlow fields to snake_case. {pull}10950[10950]
172172
- Add on_failure handler for Zeek ingest pipelines. Fix one field name error for notice and add an additional test case. {issue}11004[11004] {pull}11105[11105]
173+
- Fix goroutine leak happening when harvesters are dynamically stopped. {pull}11263[11263]
173174
- Fix issue preventing docker container events to be stored if the container has a network interface without ip address. {issue}11225[11225] {pull}11247[11247]
174175
- Add on_failure handler for Zeek ingest pipelines. Fix one field name error for notice and add an additional test
175176
case. {issue}11004[11004] {pull}11105[11105]

filebeat/channel/interface.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,5 +32,6 @@ type Connector func(*common.Config, *common.MapStrPointer) (Outleter, error)
3232
// Outleter is the outlet for an input
3333
type Outleter interface {
3434
Close() error
35+
Done() <-chan struct{}
3536
OnEvent(data *util.Data) bool
3637
}

filebeat/channel/outlet.go

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,25 +27,32 @@ type outlet struct {
2727
wg eventCounter
2828
client beat.Client
2929
isOpen atomic.Bool
30+
done chan struct{}
3031
}
3132

3233
func newOutlet(client beat.Client, wg eventCounter) *outlet {
3334
o := &outlet{
3435
wg: wg,
3536
client: client,
3637
isOpen: atomic.MakeBool(true),
38+
done: make(chan struct{}),
3739
}
3840
return o
3941
}
4042

4143
func (o *outlet) Close() error {
4244
isOpen := o.isOpen.Swap(false)
4345
if isOpen {
46+
close(o.done)
4447
return o.client.Close()
4548
}
4649
return nil
4750
}
4851

52+
func (o *outlet) Done() <-chan struct{} {
53+
return o.done
54+
}
55+
4956
func (o *outlet) OnEvent(d *util.Data) bool {
5057
if !o.isOpen.Load() {
5158
return false

filebeat/channel/util.go

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -71,6 +71,10 @@ func (o *subOutlet) Close() error {
7171
return nil
7272
}
7373

74+
func (o *subOutlet) Done() <-chan struct{} {
75+
return o.done
76+
}
77+
7478
func (o *subOutlet) OnEvent(d *util.Data) bool {
7579

7680
o.mutex.Lock()
@@ -114,8 +118,12 @@ func (o *subOutlet) OnEvent(d *util.Data) bool {
114118
func CloseOnSignal(outlet Outleter, sig <-chan struct{}) Outleter {
115119
if sig != nil {
116120
go func() {
117-
<-sig
118-
outlet.Close()
121+
select {
122+
case <-outlet.Done():
123+
return
124+
case <-sig:
125+
outlet.Close()
126+
}
119127
}()
120128
}
121129
return outlet

filebeat/input/log/input_other_test.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -169,3 +169,4 @@ type TestOutlet struct{}
169169

170170
func (o TestOutlet) OnEvent(event *util.Data) bool { return true }
171171
func (o TestOutlet) Close() error { return nil }
172+
func (o TestOutlet) Done() <-chan struct{} { return nil }

0 commit comments

Comments
 (0)