|
18 | 18 | package filestream |
19 | 19 |
|
20 | 20 | import ( |
| 21 | + "fmt" |
21 | 22 | "time" |
22 | 23 |
|
23 | 24 | "github.com/urso/sderr" |
24 | 25 |
|
25 | 26 | loginp "github.com/elastic/beats/v7/filebeat/input/filestream/internal/input-logfile" |
26 | 27 | input "github.com/elastic/beats/v7/filebeat/input/v2" |
| 28 | + "github.com/elastic/beats/v7/libbeat/beat" |
27 | 29 | "github.com/elastic/beats/v7/libbeat/logp" |
28 | 30 | "github.com/elastic/go-concert/unison" |
29 | 31 | ) |
30 | 32 |
|
| 33 | +type ignoreInactiveType uint8 |
| 34 | + |
31 | 35 | const ( |
32 | | - prospectorDebugKey = "file_prospector" |
| 36 | + InvalidIgnoreInactive = iota |
| 37 | + IgnoreInactiveSinceLastStart |
| 38 | + IgnoreInactiveSinceFirstStart |
| 39 | + |
| 40 | + ignoreInactiveSinceLastStartStr = "since_last_start" |
| 41 | + ignoreInactiveSinceFirstStartStr = "since_first_start" |
| 42 | + prospectorDebugKey = "file_prospector" |
| 43 | +) |
| 44 | + |
| 45 | +var ( |
| 46 | + ignoreInactiveSettings = map[string]ignoreInactiveType{ |
| 47 | + ignoreInactiveSinceLastStartStr: IgnoreInactiveSinceLastStart, |
| 48 | + ignoreInactiveSinceFirstStartStr: IgnoreInactiveSinceFirstStart, |
| 49 | + } |
33 | 50 | ) |
34 | 51 |
|
35 | 52 | // fileProspector implements the Prospector interface. |
36 | 53 | // It contains a file scanner which returns file system events. |
37 | 54 | // The FS events then trigger either new Harvester runs or updates |
38 | 55 | // the statestore. |
39 | 56 | type fileProspector struct { |
40 | | - filewatcher loginp.FSWatcher |
41 | | - identifier fileIdentifier |
42 | | - ignoreOlder time.Duration |
43 | | - cleanRemoved bool |
44 | | - stateChangeCloser stateChangeCloserConfig |
| 57 | + filewatcher loginp.FSWatcher |
| 58 | + identifier fileIdentifier |
| 59 | + ignoreOlder time.Duration |
| 60 | + ignoreInactiveSince ignoreInactiveType |
| 61 | + cleanRemoved bool |
| 62 | + stateChangeCloser stateChangeCloserConfig |
45 | 63 | } |
46 | 64 |
|
47 | 65 | func (p *fileProspector) Init(cleaner loginp.ProspectorCleaner) error { |
@@ -101,6 +119,8 @@ func (p *fileProspector) Run(ctx input.Context, s loginp.StateMetadataUpdater, h |
101 | 119 | }) |
102 | 120 |
|
103 | 121 | tg.Go(func() error { |
| 122 | + ignoreInactiveSince := getIgnoreSince(p.ignoreInactiveSince, ctx.Agent) |
| 123 | + |
104 | 124 | for ctx.Cancelation.Err() == nil { |
105 | 125 | fe := p.filewatcher.Event() |
106 | 126 |
|
@@ -130,6 +150,11 @@ func (p *fileProspector) Run(ctx input.Context, s loginp.StateMetadataUpdater, h |
130 | 150 | break |
131 | 151 | } |
132 | 152 | } |
| 153 | + if !ignoreInactiveSince.IsZero() && fe.Info.ModTime().Sub(ignoreInactiveSince) <= 0 { |
| 154 | + log.Debugf("Ignore file because ignore_since.* reached time %v. File %s", p.ignoreInactiveSince, fe.NewPath) |
| 155 | + break |
| 156 | + } |
| 157 | + |
133 | 158 | hg.Start(ctx, src) |
134 | 159 |
|
135 | 160 | case loginp.OpTruncate: |
@@ -217,3 +242,23 @@ func (p *fileProspector) stopHarvesterGroup(log *logp.Logger, hg loginp.Harveste |
217 | 242 | func (p *fileProspector) Test() error { |
218 | 243 | panic("TODO: implement me") |
219 | 244 | } |
| 245 | + |
| 246 | +func getIgnoreSince(t ignoreInactiveType, info beat.Info) time.Time { |
| 247 | + switch t { |
| 248 | + case IgnoreInactiveSinceLastStart: |
| 249 | + return info.StartTime |
| 250 | + case IgnoreInactiveSinceFirstStart: |
| 251 | + return info.FirstStart |
| 252 | + default: |
| 253 | + return time.Time{} |
| 254 | + } |
| 255 | +} |
| 256 | + |
| 257 | +func (t *ignoreInactiveType) Unpack(v string) error { |
| 258 | + val, ok := ignoreInactiveSettings[v] |
| 259 | + if !ok { |
| 260 | + return fmt.Errorf("invalid ignore_inactive setting: %s", v) |
| 261 | + } |
| 262 | + *t = val |
| 263 | + return nil |
| 264 | +} |
0 commit comments