Skip to content

Commit 3f39dd8

Browse files
authored
Add support for ignore_inactive in filestream input (#25036)
This PR adds support for a more flexible file ignoring in `filestream` input. A new setting is introduced named `ignore_inactive`. At the moment it only supports two values: `since_last_start` and `since_first_start`. If `since_last_start` is selected, the input ignores every file that has not been updated since Filebeat has been started. If `since_first_start` is chosen files that haven't been written since Filebeat has been started the first time on a given host are ignored.
1 parent 7561201 commit 3f39dd8

8 files changed

Lines changed: 115 additions & 10 deletions

File tree

filebeat/_meta/config/filebeat.inputs.reference.yml.tmpl

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -320,6 +320,11 @@ filebeat.inputs:
320320
# Time strings like 2h (2 hours), 5m (5 minutes) can be used.
321321
#ignore_older: 0
322322
323+
# Ignore files that have not been updated since the selected event.
324+
# ignore_inactive is disabled by default, so no files are ignored by setting it to "".
325+
# Available options: since_first_start, since_last_start.
326+
#ignore_inactive: ""
327+
323328
# Defines the buffer size every harvester uses when fetching the file
324329
#harvester_buffer_size: 16384
325330

filebeat/filebeat.reference.yml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -727,6 +727,11 @@ filebeat.inputs:
727727
# Time strings like 2h (2 hours), 5m (5 minutes) can be used.
728728
#ignore_older: 0
729729

730+
# Ignore files that have not been updated since the selected event.
731+
# ignore_inactive is disabled by default, so no files are ignored by setting it to "".
732+
# Available options: since_first_start, since_last_start.
733+
#ignore_inactive: ""
734+
730735
# Defines the buffer size every harvester uses when fetching the file
731736
#harvester_buffer_size: 16384
732737

filebeat/input/filestream/config.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@ type config struct {
4040
CleanRemoved bool `config:"clean_removed"`
4141
HarvesterLimit uint32 `config:"harvester_limit" validate:"min=0"`
4242
IgnoreOlder time.Duration `config:"ignore_older"`
43+
IgnoreInactive ignoreInactiveType `config:"ignore_inactive"`
4344
}
4445

4546
type closerConfig struct {

filebeat/input/filestream/prospector.go

Lines changed: 51 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -18,30 +18,48 @@
1818
package filestream
1919

2020
import (
21+
"fmt"
2122
"time"
2223

2324
"github.com/urso/sderr"
2425

2526
loginp "github.com/elastic/beats/v7/filebeat/input/filestream/internal/input-logfile"
2627
input "github.com/elastic/beats/v7/filebeat/input/v2"
28+
"github.com/elastic/beats/v7/libbeat/beat"
2729
"github.com/elastic/beats/v7/libbeat/logp"
2830
"github.com/elastic/go-concert/unison"
2931
)
3032

33+
type ignoreInactiveType uint8
34+
3135
const (
32-
prospectorDebugKey = "file_prospector"
36+
InvalidIgnoreInactive = iota
37+
IgnoreInactiveSinceLastStart
38+
IgnoreInactiveSinceFirstStart
39+
40+
ignoreInactiveSinceLastStartStr = "since_last_start"
41+
ignoreInactiveSinceFirstStartStr = "since_first_start"
42+
prospectorDebugKey = "file_prospector"
43+
)
44+
45+
var (
46+
ignoreInactiveSettings = map[string]ignoreInactiveType{
47+
ignoreInactiveSinceLastStartStr: IgnoreInactiveSinceLastStart,
48+
ignoreInactiveSinceFirstStartStr: IgnoreInactiveSinceFirstStart,
49+
}
3350
)
3451

3552
// fileProspector implements the Prospector interface.
3653
// It contains a file scanner which returns file system events.
3754
// The FS events then trigger either new Harvester runs or updates
3855
// the statestore.
3956
type fileProspector struct {
40-
filewatcher loginp.FSWatcher
41-
identifier fileIdentifier
42-
ignoreOlder time.Duration
43-
cleanRemoved bool
44-
stateChangeCloser stateChangeCloserConfig
57+
filewatcher loginp.FSWatcher
58+
identifier fileIdentifier
59+
ignoreOlder time.Duration
60+
ignoreInactiveSince ignoreInactiveType
61+
cleanRemoved bool
62+
stateChangeCloser stateChangeCloserConfig
4563
}
4664

4765
func (p *fileProspector) Init(cleaner loginp.ProspectorCleaner) error {
@@ -101,6 +119,8 @@ func (p *fileProspector) Run(ctx input.Context, s loginp.StateMetadataUpdater, h
101119
})
102120

103121
tg.Go(func() error {
122+
ignoreInactiveSince := getIgnoreSince(p.ignoreInactiveSince, ctx.Agent)
123+
104124
for ctx.Cancelation.Err() == nil {
105125
fe := p.filewatcher.Event()
106126

@@ -130,6 +150,11 @@ func (p *fileProspector) Run(ctx input.Context, s loginp.StateMetadataUpdater, h
130150
break
131151
}
132152
}
153+
if !ignoreInactiveSince.IsZero() && fe.Info.ModTime().Sub(ignoreInactiveSince) <= 0 {
154+
log.Debugf("Ignore file because ignore_since.* reached time %v. File %s", p.ignoreInactiveSince, fe.NewPath)
155+
break
156+
}
157+
133158
hg.Start(ctx, src)
134159

135160
case loginp.OpTruncate:
@@ -217,3 +242,23 @@ func (p *fileProspector) stopHarvesterGroup(log *logp.Logger, hg loginp.Harveste
217242
func (p *fileProspector) Test() error {
218243
panic("TODO: implement me")
219244
}
245+
246+
func getIgnoreSince(t ignoreInactiveType, info beat.Info) time.Time {
247+
switch t {
248+
case IgnoreInactiveSinceLastStart:
249+
return info.StartTime
250+
case IgnoreInactiveSinceFirstStart:
251+
return info.FirstStart
252+
default:
253+
return time.Time{}
254+
}
255+
}
256+
257+
func (t *ignoreInactiveType) Unpack(v string) error {
258+
val, ok := ignoreInactiveSettings[v]
259+
if !ok {
260+
return fmt.Errorf("invalid ignore_inactive setting: %s", v)
261+
}
262+
*t = val
263+
return nil
264+
}

libbeat/beat/info.go

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,11 @@
1717

1818
package beat
1919

20-
import "github.com/gofrs/uuid"
20+
import (
21+
"time"
22+
23+
"github.com/gofrs/uuid"
24+
)
2125

2226
// Info stores a beats instance meta data.
2327
type Info struct {
@@ -29,6 +33,8 @@ type Info struct {
2933
Hostname string // hostname
3034
ID uuid.UUID // ID assigned to beat machine
3135
EphemeralID uuid.UUID // ID assigned to beat process invocation (PID)
36+
FirstStart time.Time // The time of the first start of the Beat.
37+
StartTime time.Time // The time of last start of the Beat. Updated when the Beat is started or restarted.
3238

3339
// Monitoring-related fields
3440
Monitoring struct {

libbeat/cmd/instance/beat.go

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -239,6 +239,8 @@ func NewBeat(name, indexPrefix, v string, elasticLicensed bool) (*Beat, error) {
239239
Name: hostname,
240240
Hostname: hostname,
241241
ID: id,
242+
FirstStart: time.Now(),
243+
StartTime: time.Now(),
242244
EphemeralID: metrics.EphemeralID(),
243245
},
244246
Fields: fields,
@@ -695,7 +697,8 @@ func (b *Beat) configure(settings Settings) error {
695697

696698
func (b *Beat) loadMeta(metaPath string) error {
697699
type meta struct {
698-
UUID uuid.UUID `json:"uuid"`
700+
UUID uuid.UUID `json:"uuid"`
701+
FirstStart time.Time `json:"first_start"`
699702
}
700703

701704
logp.Debug("beat", "Beat metadata path: %v", metaPath)
@@ -713,14 +716,21 @@ func (b *Beat) loadMeta(metaPath string) error {
713716
}
714717

715718
f.Close()
719+
720+
if !m.FirstStart.IsZero() {
721+
b.Info.FirstStart = m.FirstStart
722+
}
716723
valid := m.UUID != uuid.Nil
717724
if valid {
718725
b.Info.ID = m.UUID
726+
}
727+
728+
if valid && !m.FirstStart.IsZero() {
719729
return nil
720730
}
721731
}
722732

723-
// file does not exist or ID is invalid, let's create a new one
733+
// file does not exist or ID is invalid or first start time is not defined, let's create a new one
724734

725735
// write temporary file first
726736
tempFile := metaPath + ".new"
@@ -729,7 +739,7 @@ func (b *Beat) loadMeta(metaPath string) error {
729739
return fmt.Errorf("Failed to create Beat meta file: %s", err)
730740
}
731741

732-
encodeErr := json.NewEncoder(f).Encode(meta{UUID: b.Info.ID})
742+
encodeErr := json.NewEncoder(f).Encode(meta{UUID: b.Info.ID, FirstStart: b.Info.FirstStart})
733743
err = f.Sync()
734744
if err != nil {
735745
return fmt.Errorf("Beat meta file failed to write: %s", err)

libbeat/cmd/instance/beat_test.go

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -114,3 +114,31 @@ func TestEmptyMetaJson(t *testing.T) {
114114
assert.Equal(t, nil, err, "Unable to load meta file properly")
115115
assert.NotEqual(t, uuid.Nil, b.Info.ID, "Beats UUID is not set")
116116
}
117+
118+
func TestMetaJsonWithTimestamp(t *testing.T) {
119+
firstBeat, err := NewBeat("filebeat", "testidx", "0.9", false)
120+
if err != nil {
121+
panic(err)
122+
}
123+
firstStart := firstBeat.Info.FirstStart
124+
125+
metaFile, err := ioutil.TempFile("../test", "meta.json")
126+
assert.Equal(t, nil, err, "Unable to create temporary meta file")
127+
128+
metaPath := metaFile.Name()
129+
metaFile.Close()
130+
defer os.Remove(metaPath)
131+
132+
err = firstBeat.loadMeta(metaPath)
133+
assert.Equal(t, nil, err, "Unable to load meta file properly")
134+
135+
secondBeat, err := NewBeat("filebeat", "testidx", "0.9", false)
136+
if err != nil {
137+
panic(err)
138+
}
139+
assert.False(t, firstStart.Equal(secondBeat.Info.FirstStart), "Before meta.json is loaded, first start must be different")
140+
secondBeat.loadMeta(metaPath)
141+
142+
assert.Equal(t, nil, err, "Unable to load meta file properly")
143+
assert.True(t, firstStart.Equal(secondBeat.Info.FirstStart), "Cannot load first start")
144+
}

x-pack/filebeat/filebeat.reference.yml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2578,6 +2578,11 @@ filebeat.inputs:
25782578
# Time strings like 2h (2 hours), 5m (5 minutes) can be used.
25792579
#ignore_older: 0
25802580

2581+
# Ignore files that have not been updated since the selected event.
2582+
# ignore_inactive is disabled by default, so no files are ignored by setting it to "".
2583+
# Available options: since_first_start, since_last_start.
2584+
#ignore_inactive: ""
2585+
25812586
# Defines the buffer size every harvester uses when fetching the file
25822587
#harvester_buffer_size: 16384
25832588

0 commit comments

Comments
 (0)