Skip to content

Commit af6222d

Browse files
authored
Filebeat: Fix random error on harvester close (#21048)
This fixes a race condition when a harvester is closed at the same time that its source file size is being calculated. Before this fix, `Error updating file size` errors are randomly printed, because the file handle becomes closed inside the os.Stat call.
1 parent 54ea284 commit af6222d

2 files changed

Lines changed: 10 additions & 2 deletions

File tree

CHANGELOG.next.asciidoc

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -260,6 +260,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d
260260
- Update documentation in the azure module filebeat. {pull}20815[20815]
261261
- Provide backwards compatibility for the `set` processor when Elasticsearch is less than 7.9.0. {pull}20908[20908]
262262
- Remove wrongly mapped `tls.client.server_name` from `fortinet/firewall` fileset. {pull}20983[20983]
263+
- Fix an error updating file size being logged when EOF is reached. {pull}21048[21048]
263264

264265
*Heartbeat*
265266

filebeat/input/log/harvester.go

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -83,6 +83,7 @@ type Harvester struct {
8383

8484
// shutdown handling
8585
done chan struct{}
86+
doneWg *sync.WaitGroup
8687
stopOnce sync.Once
8788
stopWg *sync.WaitGroup
8889
stopLock sync.Mutex
@@ -138,6 +139,7 @@ func NewHarvester(
138139
publishState: publishState,
139140
done: make(chan struct{}),
140141
stopWg: &sync.WaitGroup{},
142+
doneWg: &sync.WaitGroup{},
141143
id: id,
142144
outletFactory: outletFactory,
143145
}
@@ -299,7 +301,11 @@ func (h *Harvester) Run() error {
299301

300302
logp.Info("Harvester started for file: %s", h.state.Source)
301303

302-
go h.monitorFileSize()
304+
h.doneWg.Add(1)
305+
go func() {
306+
h.monitorFileSize()
307+
h.doneWg.Done()
308+
}()
303309

304310
for {
305311
select {
@@ -378,7 +384,8 @@ func (h *Harvester) monitorFileSize() {
378384
func (h *Harvester) stop() {
379385
h.stopOnce.Do(func() {
380386
close(h.done)
381-
387+
// Wait for goroutines monitoring h.done to terminate before closing source.
388+
h.doneWg.Wait()
382389
filesMetrics.Remove(h.id.String())
383390
})
384391
}

0 commit comments

Comments
 (0)