Skip to content

Commit 1c71c94

Browse files
author
Steffen Siering
authored
Add support to combine update operations (#25976)
1 parent 99bc09f commit 1c71c94

13 files changed

Lines changed: 442 additions & 97 deletions

File tree

CHANGELOG.next.asciidoc

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -859,6 +859,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d
859859
- Update `fortinet` ingest pipelines. {issue}22136[22136] {issue}25254[25254] {pull}24816[24816]
860860
- Use default add_locale for fortinet.firewall {issue}20300[20300] {pull}26524[26524]
861861
- Add new template functions and `value_type` parameter to `httpjson` transforms. {pull}26847[26847]
862+
- Add support to merge registry updates in the filestream input across multiple ACKed batches in case of backpressure in the registry or disk. {pull}25976[25976]
862863

863864
*Heartbeat*
864865

filebeat/input/filestream/internal/input-logfile/clean_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -145,7 +145,7 @@ func TestGCStore(t *testing.T) {
145145

146146
// create pending update operation
147147
res := store.Get("test::key")
148-
op, err := createUpdateOp(store, res, "test-state-update")
148+
op, err := createUpdateOp(res, "test-state-update")
149149
require.NoError(t, err)
150150
res.Release()
151151

filebeat/input/filestream/internal/input-logfile/cursor.go

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -20,12 +20,11 @@ package input_logfile
2020
// Cursor allows the input to check if cursor status has been stored
2121
// in the past and unpack the status into a custom structure.
2222
type Cursor struct {
23-
store *store
2423
resource *resource
2524
}
2625

27-
func makeCursor(store *store, res *resource) Cursor {
28-
return Cursor{store: store, resource: res}
26+
func makeCursor(res *resource) Cursor {
27+
return Cursor{resource: res}
2928
}
3029

3130
// IsNew returns true if no cursor information has been stored

filebeat/input/filestream/internal/input-logfile/cursor_test.go

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ func TestCursor_IsNew(t *testing.T) {
2828
store := testOpenStore(t, "test", createSampleStore(t, nil))
2929
defer store.Release()
3030

31-
cursor := makeCursor(store, store.Get("test::key"))
31+
cursor := makeCursor(store.Get("test::key"))
3232
require.True(t, cursor.IsNew())
3333
})
3434

@@ -38,7 +38,7 @@ func TestCursor_IsNew(t *testing.T) {
3838
}))
3939
defer store.Release()
4040

41-
cursor := makeCursor(store, store.Get("test::key"))
41+
cursor := makeCursor(store.Get("test::key"))
4242
require.True(t, cursor.IsNew())
4343
})
4444

@@ -48,7 +48,7 @@ func TestCursor_IsNew(t *testing.T) {
4848
}))
4949
defer store.Release()
5050

51-
cursor := makeCursor(store, store.Get("test::key"))
51+
cursor := makeCursor(store.Get("test::key"))
5252
require.False(t, cursor.IsNew())
5353
})
5454

@@ -59,11 +59,11 @@ func TestCursor_IsNew(t *testing.T) {
5959
defer store.Release()
6060

6161
res := store.Get("test::key")
62-
op, err := createUpdateOp(store, res, "test-state-update")
62+
op, err := createUpdateOp(res, "test-state-update")
6363
require.NoError(t, err)
6464
defer op.done(1)
6565

66-
cursor := makeCursor(store, res)
66+
cursor := makeCursor(res)
6767
require.False(t, cursor.IsNew())
6868
})
6969
}
@@ -74,7 +74,7 @@ func TestCursor_Unpack(t *testing.T) {
7474
defer store.Release()
7575

7676
var st string
77-
cursor := makeCursor(store, store.Get("test::key"))
77+
cursor := makeCursor(store.Get("test::key"))
7878

7979
require.NoError(t, cursor.Unpack(&st))
8080
require.Equal(t, "", st)
@@ -87,7 +87,7 @@ func TestCursor_Unpack(t *testing.T) {
8787
defer store.Release()
8888

8989
var st struct{ A uint }
90-
cursor := makeCursor(store, store.Get("test::key"))
90+
cursor := makeCursor(store.Get("test::key"))
9191
require.Error(t, cursor.Unpack(&st))
9292
})
9393

@@ -98,7 +98,7 @@ func TestCursor_Unpack(t *testing.T) {
9898
defer store.Release()
9999

100100
var st string
101-
cursor := makeCursor(store, store.Get("test::key"))
101+
cursor := makeCursor(store.Get("test::key"))
102102

103103
require.NoError(t, cursor.Unpack(&st))
104104
require.Equal(t, "test", st)
@@ -111,12 +111,12 @@ func TestCursor_Unpack(t *testing.T) {
111111
defer store.Release()
112112

113113
res := store.Get("test::key")
114-
op, err := createUpdateOp(store, res, "test-state-update")
114+
op, err := createUpdateOp(res, "test-state-update")
115115
require.NoError(t, err)
116116
defer op.done(1)
117117

118118
var st string
119-
cursor := makeCursor(store, store.Get("test::key"))
119+
cursor := makeCursor(store.Get("test::key"))
120120

121121
require.NoError(t, cursor.Unpack(&st))
122122
require.Equal(t, "test-state-update", st)

filebeat/input/filestream/internal/input-logfile/harvester.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -133,6 +133,7 @@ type defaultHarvesterGroup struct {
133133
harvester Harvester
134134
cleanTimeout time.Duration
135135
store *store
136+
ackCH *updateChan
136137
identifier *sourceIdentifier
137138
tg unison.TaskGroup
138139
}
@@ -191,7 +192,7 @@ func startHarvester(ctx input.Context, hg *defaultHarvesterGroup, s Source, rest
191192

192193
client, err := hg.pipeline.ConnectWith(beat.ClientConfig{
193194
CloseRef: ctx.Cancelation,
194-
ACKHandler: newInputACKHandler(ctx.Logger),
195+
ACKHandler: newInputACKHandler(hg.ackCH, ctx.Logger),
195196
})
196197
if err != nil {
197198
hg.readers.remove(srcID)
@@ -200,7 +201,7 @@ func startHarvester(ctx input.Context, hg *defaultHarvesterGroup, s Source, rest
200201
defer client.Close()
201202

202203
hg.store.UpdateTTL(resource, hg.cleanTimeout)
203-
cursor := makeCursor(hg.store, resource)
204+
cursor := makeCursor(resource)
204205
publisher := &cursorPublisher{canceler: ctx.Cancelation, client: client, cursor: &cursor}
205206

206207
err = hg.harvester.Run(ctx, s, cursor, publisher)

filebeat/input/filestream/internal/input-logfile/input.go

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ import (
3232
type managedInput struct {
3333
userID string
3434
manager *InputManager
35+
ackCH *updateChan
3536
sourceIdentifier *sourceIdentifier
3637
prospector Prospector
3738
harvester Harvester
@@ -67,6 +68,7 @@ func (inp *managedInput) Run(
6768
cleanTimeout: inp.cleanTimeout,
6869
harvester: inp.harvester,
6970
store: groupStore,
71+
ackCH: inp.ackCH,
7072
identifier: inp.sourceIdentifier,
7173
tg: unison.TaskGroup{},
7274
}
@@ -80,7 +82,7 @@ func (inp *managedInput) Run(
8082
return nil
8183
}
8284

83-
func newInputACKHandler(log *logp.Logger) beat.ACKer {
85+
func newInputACKHandler(ch *updateChan, log *logp.Logger) beat.ACKer {
8486
return acker.EventPrivateReporter(func(acked int, private []interface{}) {
8587
var n uint
8688
var last int
@@ -101,6 +103,8 @@ func newInputACKHandler(log *logp.Logger) beat.ACKer {
101103
if n == 0 {
102104
return
103105
}
104-
private[last].(*updateOp).Execute(n)
106+
107+
op := private[last].(*updateOp)
108+
ch.Send(scheduledUpdate{op: op, n: n})
105109
})
106110
}

filebeat/input/filestream/internal/input-logfile/manager.go

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -66,9 +66,11 @@ type InputManager struct {
6666
// that will be used to collect events from each source.
6767
Configure func(cfg *common.Config) (Prospector, Harvester, error)
6868

69-
initOnce sync.Once
70-
initErr error
71-
store *store
69+
initOnce sync.Once
70+
initErr error
71+
store *store
72+
ackUpdater *updateWriter
73+
ackCH *updateChan
7274
}
7375

7476
// Source describe a source the input can collect data from.
@@ -104,6 +106,8 @@ func (cim *InputManager) init() error {
104106
}
105107

106108
cim.store = store
109+
cim.ackCH = newUpdateChan()
110+
cim.ackUpdater = newUpdateWriter(store, cim.ackCH)
107111
})
108112

109113
return cim.initErr
@@ -144,6 +148,7 @@ func (cim *InputManager) Init(group unison.Group, mode v2.Mode) error {
144148
}
145149

146150
func (cim *InputManager) shutdown() {
151+
cim.ackUpdater.Close()
147152
cim.store.Release()
148153
}
149154

@@ -178,6 +183,7 @@ func (cim *InputManager) Create(config *common.Config) (input.Input, error) {
178183

179184
pStore := cim.getRetainedStore()
180185
defer pStore.Release()
186+
181187
prospectorStore := newSourceStore(pStore, sourceIdentifier)
182188
err = prospector.Init(prospectorStore)
183189
if err != nil {
@@ -186,6 +192,7 @@ func (cim *InputManager) Create(config *common.Config) (input.Input, error) {
186192

187193
return &managedInput{
188194
manager: cim,
195+
ackCH: cim.ackCH,
189196
userID: settings.ID,
190197
prospector: prospector,
191198
harvester: harvester,

filebeat/input/filestream/internal/input-logfile/publish.go

Lines changed: 21 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,6 @@ type cursorPublisher struct {
5050
// instances can add update operations to be executed after already pending
5151
// update operations from older inputs instances that have been shutdown.
5252
type updateOp struct {
53-
store *store
5453
resource *resource
5554

5655
// state updates to persist
@@ -59,6 +58,18 @@ type updateOp struct {
5958
delta interface{}
6059
}
6160

61+
func newUpdateOp(resource *resource, ts time.Time, delta interface{}) *updateOp {
62+
return &updateOp{
63+
resource: resource,
64+
timestamp: ts,
65+
delta: delta,
66+
}
67+
}
68+
69+
func (op *updateOp) Key() string {
70+
return op.resource.key
71+
}
72+
6273
// Publish publishes an event. Publish returns false if the inputs cancellation context has been marked as done.
6374
// If cursorUpdate is not nil, Publish updates the in memory state and create and updateOp for the pending update.
6475
// It overwrite event.Private with the update operation, before finally sending the event.
@@ -69,7 +80,7 @@ func (c *cursorPublisher) Publish(event beat.Event, cursorUpdate interface{}) er
6980
return c.forward(event)
7081
}
7182

72-
op, err := createUpdateOp(c.cursor.store, c.cursor.resource, cursorUpdate)
83+
op, err := createUpdateOp(c.cursor.resource, cursorUpdate)
7384
if err != nil {
7485
return err
7586
}
@@ -86,43 +97,27 @@ func (c *cursorPublisher) forward(event beat.Event) error {
8697
return c.canceler.Err()
8798
}
8899

89-
func createUpdateOp(store *store, resource *resource, updates interface{}) (*updateOp, error) {
100+
func createUpdateOp(resource *resource, updates interface{}) (*updateOp, error) {
90101
ts := time.Now()
91102

92103
resource.stateMutex.Lock()
93104
defer resource.stateMutex.Unlock()
94105

95-
cursor := resource.pendingCursor
96-
if resource.activeCursorOperations == 0 {
97-
var tmp interface{}
98-
typeconv.Convert(&tmp, cursor)
99-
resource.pendingCursor = tmp
100-
cursor = tmp
101-
}
102-
if err := typeconv.Convert(&cursor, updates); err != nil {
103-
return nil, err
104-
}
105-
resource.pendingCursor = cursor
106+
resource.pendingUpdate = updates
106107

107108
resource.Retain()
108109
resource.activeCursorOperations++
109-
return &updateOp{
110-
resource: resource,
111-
store: store,
112-
timestamp: ts,
113-
delta: updates,
114-
}, nil
110+
return newUpdateOp(resource, ts, updates), nil
115111
}
116112

117113
// done releases resources held by the last N updateOps.
118114
func (op *updateOp) done(n uint) {
119115
op.resource.UpdatesReleaseN(n)
120116
op.resource = nil
121-
*op = updateOp{}
122117
}
123118

124119
// Execute updates the persistent store with the scheduled changes and releases the resource.
125-
func (op *updateOp) Execute(n uint) {
120+
func (op *updateOp) Execute(store *store, n uint) {
126121
resource := op.resource
127122

128123
resource.stateMutex.Lock()
@@ -135,8 +130,8 @@ func (op *updateOp) Execute(n uint) {
135130
defer op.done(n)
136131
resource.activeCursorOperations -= n
137132
if resource.activeCursorOperations == 0 {
138-
resource.cursor = resource.pendingCursor
139-
resource.pendingCursor = nil
133+
resource.cursor = resource.pendingCursor()
134+
resource.pendingCursorValue = nil
140135
} else {
141136
typeconv.Convert(&resource.cursor, op.delta)
142137
}
@@ -145,13 +140,12 @@ func (op *updateOp) Execute(n uint) {
145140
resource.internalState.Updated = op.timestamp
146141
}
147142

148-
err := op.store.persistentStore.Set(resource.key, resource.inSyncStateSnapshot())
143+
err := store.persistentStore.Set(resource.key, resource.inSyncStateSnapshot())
149144
if err != nil {
150145
if !statestore.IsClosed(err) {
151-
op.store.log.Errorf("Failed to update state in the registry for '%v'", resource.key)
146+
store.log.Errorf("Failed to update state in the registry for '%v'", resource.key)
152147
}
153148
} else {
154-
resource.internalInSync = true
155149
resource.stored = true
156150
}
157151
}

0 commit comments

Comments
 (0)