Skip to content

Commit c558984

Browse files
Steffen Sieringtsg
authored andcommitted
Fix filebeat registry meta being nil vs empty (#7632)
Filebeat introduces a meta field to registry entries in 6.3.1. The meta field is used to distuingish different log streams in docker files. For other input types the meta field must be null. Unfortunately the input loader did initialize the meta field with an empty dictionary. This leads to failing matches of old and new registry entries. Due to the match failing, old entries will not be removed, and filebeat will handle all files as new files on startup (old logs are send again). Users will observe duplicate entries in the reigstry file. One entry with "meta": null and one entry with "meta": {}. The entry with "meta": {} will be used by filebeat. The null-entry will not be used by filebeat, but is kept in the registry file, cause it has now active owner (yet). Improvements provided by this PR: * when matching states consider an empty map and a null-map to be equivalent * update input loader to create a null map for old state -> registry entries will be compatible on upgrade * Add checks in critical places replacing an empty map with a null-map * Add support to fix registry entries on load. states from corrupted 6.3.1 files will be merged into one single state on load * introduce unit tests for loading different registry formats * introduce system tests validating output and registry when upgrading filebeat from an older version Closes: #7634
1 parent 5eaafff commit c558984

13 files changed

Lines changed: 401 additions & 10 deletions

CHANGELOG.asciidoc

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -103,6 +103,7 @@ https://github.com/elastic/beats/compare/v6.2.3...master[Check the HEAD diff]
103103
- Fix offset field pointing at end of a line. {issue}6514[6514]
104104
- Fix an issue when parsing ISO8601 dates with timezone definition {issue}7367[7367]
105105
- Fix Grok pattern of MongoDB module. {pull}7568[7568]
106+
- Fix registry duplicates and log resending on upgrade. {issue}7634[7634]
106107

107108
*Heartbeat*
108109
- Fix race due to updates of shared a map, that was not supposed to be shared between multiple go-routines. {issue}6616[6616]

filebeat/input/docker/input.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,9 @@ func NewInput(
7373

7474
// Add stream to meta to ensure different state per stream
7575
if config.Containers.Stream != "all" {
76+
if context.Meta == nil {
77+
context.Meta = map[string]string{}
78+
}
7679
context.Meta["stream"] = config.Containers.Stream
7780
}
7881

filebeat/input/file/state.go

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,9 @@ type State struct {
4444

4545
// NewState creates a new file state
4646
func NewState(fileInfo os.FileInfo, path string, t string, meta map[string]string) State {
47+
if len(meta) == 0 {
48+
meta = nil
49+
}
4750
return State{
4851
Fileinfo: fileInfo,
4952
Source: path,
@@ -60,7 +63,7 @@ func NewState(fileInfo os.FileInfo, path string, t string, meta map[string]strin
6063
func (s *State) ID() string {
6164
// Generate id on first request. This is needed as id is not set when converting back from json
6265
if s.Id == "" {
63-
if s.Meta == nil {
66+
if len(s.Meta) == 0 {
6467
s.Id = s.FileStateOS.String()
6568
} else {
6669
hashValue, _ := hashstructure.Hash(s.Meta, nil)
@@ -91,6 +94,6 @@ func (s *State) IsEqual(c *State) bool {
9194
func (s *State) IsEmpty() bool {
9295
return s.FileStateOS == file.StateOS{} &&
9396
s.Source == "" &&
94-
s.Meta == nil &&
97+
len(s.Meta) == 0 &&
9598
s.Timestamp.IsZero()
9699
}

filebeat/input/input.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -96,7 +96,7 @@ func New(
9696
Done: input.done,
9797
BeatDone: input.beatDone,
9898
DynamicFields: dynFields,
99-
Meta: map[string]string{},
99+
Meta: nil,
100100
}
101101
var ipt Input
102102
ipt, err = f(conf, outlet, context)

filebeat/input/log/input.go

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -93,6 +93,11 @@ func NewInput(
9393
// can be forwarded correctly to the registrar.
9494
stateOut := channel.CloseOnSignal(channel.SubOutlet(out), context.BeatDone)
9595

96+
meta := context.Meta
97+
if len(meta) == 0 {
98+
meta = nil
99+
}
100+
96101
p := &Input{
97102
config: defaultConfig,
98103
cfg: cfg,
@@ -101,7 +106,7 @@ func NewInput(
101106
stateOutlet: stateOut,
102107
states: file.NewStates(),
103108
done: context.Done,
104-
meta: context.Meta,
109+
meta: meta,
105110
}
106111

107112
if err := cfg.Unpack(&p.config); err != nil {
@@ -687,6 +692,10 @@ func (p *Input) updateState(state file.State) error {
687692
state.TTL = p.config.CleanInactive
688693
}
689694

695+
if len(state.Meta) == 0 {
696+
state.Meta = nil
697+
}
698+
690699
// Update first internal state
691700
p.states.Update(state)
692701

filebeat/registrar/registrar.go

Lines changed: 98 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ package registrar
2020
import (
2121
"encoding/json"
2222
"fmt"
23+
"io"
2324
"os"
2425
"path/filepath"
2526
"sync"
@@ -132,20 +133,111 @@ func (r *Registrar) loadStates() error {
132133

133134
logp.Info("Loading registrar data from %s", r.registryFile)
134135

135-
decoder := json.NewDecoder(f)
136-
states := []file.State{}
137-
err = decoder.Decode(&states)
136+
states, err := readStatesFrom(f)
138137
if err != nil {
139-
return fmt.Errorf("Error decoding states: %s", err)
138+
return err
140139
}
141-
142-
states = resetStates(states)
143140
r.states.SetStates(states)
144141
logp.Info("States Loaded from registrar: %+v", len(states))
145142

146143
return nil
147144
}
148145

146+
func readStatesFrom(in io.Reader) ([]file.State, error) {
147+
states := []file.State{}
148+
decoder := json.NewDecoder(in)
149+
if err := decoder.Decode(&states); err != nil {
150+
return nil, fmt.Errorf("Error decoding states: %s", err)
151+
}
152+
153+
states = fixStates(states)
154+
states = resetStates(states)
155+
return states, nil
156+
}
157+
158+
// fixStates cleans up the regsitry states when updating from an older version
159+
// of filebeat potentially writing invalid entries.
160+
func fixStates(states []file.State) []file.State {
161+
if len(states) == 0 {
162+
return states
163+
}
164+
165+
// we use a map of states here, so to identify and merge duplicate entries.
166+
idx := map[string]*file.State{}
167+
for i := range states {
168+
state := &states[i]
169+
fixState(state)
170+
171+
id := state.ID()
172+
old, exists := idx[id]
173+
if !exists {
174+
idx[id] = state
175+
} else {
176+
mergeStates(old, state) // overwrite the entry in 'old'
177+
}
178+
}
179+
180+
if len(idx) == len(states) {
181+
return states
182+
}
183+
184+
i := 0
185+
newStates := make([]file.State, len(idx))
186+
for _, state := range idx {
187+
newStates[i] = *state
188+
i++
189+
}
190+
return newStates
191+
}
192+
193+
// fixState updates a read state to fullfil required invariantes:
194+
// - "Meta" must be nil if len(Meta) == 0
195+
func fixState(st *file.State) {
196+
if len(st.Meta) == 0 {
197+
st.Meta = nil
198+
}
199+
}
200+
201+
// mergeStates merges 2 states by trying to determine the 'newer' state.
202+
// The st state is overwritten with the updated fields.
203+
func mergeStates(st, other *file.State) {
204+
st.Finished = st.Finished || other.Finished
205+
if st.Offset < other.Offset { // always select the higher offset
206+
st.Offset = other.Offset
207+
}
208+
209+
// update file meta-data. As these are updated concurrently by the
210+
// prospectors, select the newer state based on the update timestamp.
211+
var meta, metaOld, metaNew map[string]string
212+
if st.Timestamp.Before(other.Timestamp) {
213+
st.Source = other.Source
214+
st.Timestamp = other.Timestamp
215+
st.TTL = other.TTL
216+
st.FileStateOS = other.FileStateOS
217+
218+
metaOld, metaNew = st.Meta, other.Meta
219+
} else {
220+
metaOld, metaNew = other.Meta, st.Meta
221+
}
222+
223+
if len(metaOld) == 0 || len(metaNew) == 0 {
224+
meta = metaNew
225+
} else {
226+
meta = map[string]string{}
227+
for k, v := range metaOld {
228+
meta[k] = v
229+
}
230+
for k, v := range metaNew {
231+
meta[k] = v
232+
}
233+
}
234+
235+
if len(meta) == 0 {
236+
meta = nil
237+
}
238+
st.Meta = meta
239+
}
240+
149241
// resetStates sets all states to finished and disable TTL on restart
150242
// For all states covered by an input, TTL will be overwritten with the input value
151243
func resetStates(states []file.State) []file.State {

0 commit comments

Comments
 (0)