Skip to content

Commit b0236ee

Browse files
authored
[libbeat] Add configurable exponential backoff for disk queue write errors (#21493)
1 parent bcb8da0 commit b0236ee

5 files changed

Lines changed: 104 additions & 16 deletions

File tree

libbeat/publisher/queue/diskqueue/config.go

Lines changed: 47 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ import (
2121
"errors"
2222
"fmt"
2323
"path/filepath"
24+
"time"
2425

2526
"github.com/elastic/beats/v7/libbeat/common"
2627
"github.com/elastic/beats/v7/libbeat/common/cfgtype"
@@ -61,16 +62,26 @@ type Settings struct {
6162
// A listener that should be sent ACKs when an event is successfully
6263
// written to disk.
6364
WriteToDiskListener queue.ACKListener
65+
66+
// RetryInterval specifies how long to wait before retrying a fatal error
67+
// writing to disk. If MaxRetryInterval is nonzero, subsequent retries will
68+
// use exponential backoff up to the specified limit.
69+
RetryInterval time.Duration
70+
MaxRetryInterval time.Duration
6471
}
6572

6673
// userConfig holds the parameters for a disk queue that are configurable
6774
// by the end user in the beats yml file.
6875
type userConfig struct {
69-
Path string `config:"path"`
70-
MaxSize cfgtype.ByteSize `config:"max_size" validate:"required"`
71-
SegmentSize *cfgtype.ByteSize `config:"segment_size"`
72-
ReadAheadLimit *int `config:"read_ahead"`
73-
WriteAheadLimit *int `config:"write_ahead"`
76+
Path string `config:"path"`
77+
MaxSize cfgtype.ByteSize `config:"max_size" validate:"required"`
78+
SegmentSize *cfgtype.ByteSize `config:"segment_size"`
79+
80+
ReadAheadLimit *int `config:"read_ahead"`
81+
WriteAheadLimit *int `config:"write_ahead"`
82+
83+
RetryInterval *time.Duration `config:"retry_interval" validate:"positive"`
84+
MaxRetryInterval *time.Duration `config:"max_retry_interval" validate:"positive"`
7485
}
7586

7687
func (c *userConfig) Validate() error {
@@ -96,6 +107,13 @@ func (c *userConfig) Validate() error {
96107
"Disk queue segment_size (%d) cannot be less than 1MB", *c.SegmentSize)
97108
}
98109

110+
if c.RetryInterval != nil && c.MaxRetryInterval != nil &&
111+
*c.MaxRetryInterval < *c.RetryInterval {
112+
return fmt.Errorf(
113+
"Disk queue max_retry_interval (%v) can't be less than retry_interval (%v)",
114+
*c.MaxRetryInterval, *c.RetryInterval)
115+
}
116+
99117
return nil
100118
}
101119

@@ -108,6 +126,9 @@ func DefaultSettings() Settings {
108126

109127
ReadAheadLimit: 512,
110128
WriteAheadLimit: 2048,
129+
130+
RetryInterval: 1 * time.Second,
131+
MaxRetryInterval: 30 * time.Second,
111132
}
112133
}
113134

@@ -137,6 +158,13 @@ func SettingsForUserConfig(config *common.Config) (Settings, error) {
137158
settings.WriteAheadLimit = *userConfig.WriteAheadLimit
138159
}
139160

161+
if userConfig.RetryInterval != nil {
162+
settings.RetryInterval = *userConfig.RetryInterval
163+
}
164+
if userConfig.MaxRetryInterval != nil {
165+
settings.MaxRetryInterval = *userConfig.RetryInterval
166+
}
167+
140168
return settings, nil
141169
}
142170

@@ -164,3 +192,17 @@ func (settings Settings) segmentPath(segmentID segmentID) string {
164192
func (settings Settings) maxSegmentOffset() segmentOffset {
165193
return segmentOffset(settings.MaxSegmentSize - segmentHeaderSize)
166194
}
195+
196+
// Given a retry interval, nextRetryInterval returns the next higher level
197+
// of backoff.
198+
func (settings Settings) nextRetryInterval(
199+
currentInterval time.Duration,
200+
) time.Duration {
201+
if settings.MaxRetryInterval > 0 {
202+
currentInterval *= 2
203+
if currentInterval > settings.MaxRetryInterval {
204+
currentInterval = settings.MaxRetryInterval
205+
}
206+
}
207+
return currentInterval
208+
}

libbeat/publisher/queue/diskqueue/deleter_loop.go

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,7 @@ func newDeleterLoop(settings Settings) *deleterLoop {
5757
}
5858

5959
func (dl *deleterLoop) run() {
60+
currentRetryInterval := dl.settings.RetryInterval
6061
for {
6162
request, ok := <-dl.requestChan
6263
if !ok {
@@ -87,10 +88,14 @@ func (dl *deleterLoop) run() {
8788
// The delay can be interrupted if the request channel is closed,
8889
// indicating queue shutdown.
8990
select {
90-
// TODO: make the retry interval configurable.
91-
case <-time.After(time.Second):
91+
case <-time.After(currentRetryInterval):
9292
case <-dl.requestChan:
9393
}
94+
currentRetryInterval =
95+
dl.settings.nextRetryInterval(currentRetryInterval)
96+
} else {
97+
// If we made progress, reset the retry interval.
98+
currentRetryInterval = dl.settings.RetryInterval
9499
}
95100
dl.responseChan <- deleterLoopResponse{
96101
results: results,

libbeat/publisher/queue/diskqueue/segments.go

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -207,10 +207,15 @@ func (segment *queueSegment) getWriter(
207207
// retry callback returns true. This is used for timed retries when
208208
// creating a queue segment from the writer loop.
209209
func (segment *queueSegment) getWriterWithRetry(
210-
queueSettings Settings, retry func(error) bool,
210+
queueSettings Settings, retry func(err error, firstTime bool) bool,
211211
) (*os.File, error) {
212+
firstTime := true
212213
file, err := segment.getWriter(queueSettings)
213-
for err != nil && retry(err) {
214+
for err != nil && retry(err, firstTime) {
215+
// Set firstTime to false so the retry callback can perform backoff
216+
// etc if needed.
217+
firstTime = false
218+
214219
// Try again
215220
file, err = segment.getWriter(queueSettings)
216221
}

libbeat/publisher/queue/diskqueue/util.go

Lines changed: 19 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -69,16 +69,32 @@ func writeErrorIsRetriable(err error) bool {
6969
// "wrapped" field in-place as long as it isn't captured by the callback.
7070
type callbackRetryWriter struct {
7171
wrapped io.Writer
72-
retry func(error) bool
72+
73+
// The retry callback is called with the error that was produced and whether
74+
// this is the first (subsequent) error arising from this particular
75+
// write call.
76+
retry func(err error, firstTime bool) bool
7377
}
7478

7579
func (w callbackRetryWriter) Write(p []byte) (int, error) {
80+
// firstTime tracks whether the current error is the first subsequent error
81+
// being passed to the retry callback. This is so that the callback can
82+
// reset its internal counters in case it is using exponential backoff or
83+
// a retry limit.
84+
firstTime := true
7685
bytesWritten := 0
7786
writer := w.wrapped
7887
n, err := writer.Write(p)
7988
for n < len(p) {
80-
if err != nil && !w.retry(err) {
81-
return bytesWritten + n, err
89+
if err != nil {
90+
shouldRetry := w.retry(err, firstTime)
91+
firstTime = false
92+
if !shouldRetry {
93+
return bytesWritten + n, err
94+
}
95+
} else {
96+
// If we made progress without an error, reset firstTime.
97+
firstTime = true
8298
}
8399
// Advance p and try again.
84100
bytesWritten += n

libbeat/publisher/queue/diskqueue/writer_loop.go

Lines changed: 24 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -82,6 +82,8 @@ type writerLoop struct {
8282
// The file handle corresponding to currentSegment. When currentSegment
8383
// changes, this handle is closed and a new one is created.
8484
outputFile *os.File
85+
86+
currentRetryInterval time.Duration
8587
}
8688

8789
func newWriterLoop(logger *logp.Logger, settings Settings) *writerLoop {
@@ -91,6 +93,8 @@ func newWriterLoop(logger *logp.Logger, settings Settings) *writerLoop {
9193

9294
requestChan: make(chan writerLoopRequest, 1),
9395
responseChan: make(chan writerLoopResponse),
96+
97+
currentRetryInterval: settings.RetryInterval,
9498
}
9599
}
96100

@@ -215,23 +219,39 @@ outerLoop:
215219
return append(bytesWritten, curBytesWritten)
216220
}
217221

218-
// retryCallback is called (by way of retryCallbackWriter) when there is
222+
func (wl *writerLoop) applyRetryBackoff() {
223+
wl.currentRetryInterval =
224+
wl.settings.nextRetryInterval(wl.currentRetryInterval)
225+
}
226+
227+
func (wl *writerLoop) resetRetryBackoff() {
228+
wl.currentRetryInterval = wl.settings.RetryInterval
229+
}
230+
231+
// retryCallback is called (by way of callbackRetryWriter) when there is
219232
// an error writing to a segment file. It pauses for a configurable
220233
// interval and returns true if the operation should be retried (which
221234
// it always should, unless the queue is being closed).
222-
func (wl *writerLoop) retryCallback(err error) bool {
235+
func (wl *writerLoop) retryCallback(err error, firstTime bool) bool {
236+
if firstTime {
237+
// Reset any exponential backoff in the retry interval.
238+
wl.resetRetryBackoff()
239+
}
223240
if writeErrorIsRetriable(err) {
224241
return true
225242
}
243+
// If this error isn't immediately retriable, increase the exponential
244+
// backoff afterwards.
245+
defer wl.applyRetryBackoff()
246+
226247
// If the error is not immediately retriable, log the error
227248
// and wait for the retry interval before trying again, but
228249
// abort if the queue is closed (indicated by the request channel
229250
// becoming unblocked).
230251
wl.logger.Errorf("Writing to segment %v: %v",
231252
wl.currentSegment.id, err)
232253
select {
233-
case <-time.After(time.Second):
234-
// TODO: use a configurable interval here
254+
case <-time.After(wl.currentRetryInterval):
235255
return true
236256
case <-wl.requestChan:
237257
return false

0 commit comments

Comments
 (0)