@@ -21,6 +21,7 @@ import (
2121 "errors"
2222 "fmt"
2323 "path/filepath"
24+ "time"
2425
2526 "github.com/elastic/beats/v7/libbeat/common"
2627 "github.com/elastic/beats/v7/libbeat/common/cfgtype"
@@ -61,16 +62,26 @@ type Settings struct {
6162 // A listener that should be sent ACKs when an event is successfully
6263 // written to disk.
6364 WriteToDiskListener queue.ACKListener
65+
66+ // RetryInterval specifies how long to wait before retrying a fatal error
67+ // writing to disk. If MaxRetryInterval is nonzero, subsequent retries will
68+ // use exponential backoff up to the specified limit.
69+ RetryInterval time.Duration
70+ MaxRetryInterval time.Duration
6471}
6572
6673// userConfig holds the parameters for a disk queue that are configurable
6774// by the end user in the beats yml file.
6875type userConfig struct {
69- Path string `config:"path"`
70- MaxSize cfgtype.ByteSize `config:"max_size" validate:"required"`
71- SegmentSize * cfgtype.ByteSize `config:"segment_size"`
72- ReadAheadLimit * int `config:"read_ahead"`
73- WriteAheadLimit * int `config:"write_ahead"`
76+ Path string `config:"path"`
77+ MaxSize cfgtype.ByteSize `config:"max_size" validate:"required"`
78+ SegmentSize * cfgtype.ByteSize `config:"segment_size"`
79+
80+ ReadAheadLimit * int `config:"read_ahead"`
81+ WriteAheadLimit * int `config:"write_ahead"`
82+
83+ RetryInterval * time.Duration `config:"retry_interval" validate:"positive"`
84+ MaxRetryInterval * time.Duration `config:"max_retry_interval" validate:"positive"`
7485}
7586
7687func (c * userConfig ) Validate () error {
@@ -96,6 +107,13 @@ func (c *userConfig) Validate() error {
96107 "Disk queue segment_size (%d) cannot be less than 1MB" , * c .SegmentSize )
97108 }
98109
110+ if c .RetryInterval != nil && c .MaxRetryInterval != nil &&
111+ * c .MaxRetryInterval < * c .RetryInterval {
112+ return fmt .Errorf (
113+ "Disk queue max_retry_interval (%v) can't be less than retry_interval (%v)" ,
114+ * c .MaxRetryInterval , * c .RetryInterval )
115+ }
116+
99117 return nil
100118}
101119
@@ -108,6 +126,9 @@ func DefaultSettings() Settings {
108126
109127 ReadAheadLimit : 512 ,
110128 WriteAheadLimit : 2048 ,
129+
130+ RetryInterval : 1 * time .Second ,
131+ MaxRetryInterval : 30 * time .Second ,
111132 }
112133}
113134
@@ -137,6 +158,13 @@ func SettingsForUserConfig(config *common.Config) (Settings, error) {
137158 settings .WriteAheadLimit = * userConfig .WriteAheadLimit
138159 }
139160
161+ if userConfig .RetryInterval != nil {
162+ settings .RetryInterval = * userConfig .RetryInterval
163+ }
164+ if userConfig .MaxRetryInterval != nil {
165+ settings .MaxRetryInterval = * userConfig .RetryInterval
166+ }
167+
140168 return settings , nil
141169}
142170
@@ -164,3 +192,17 @@ func (settings Settings) segmentPath(segmentID segmentID) string {
164192func (settings Settings ) maxSegmentOffset () segmentOffset {
165193 return segmentOffset (settings .MaxSegmentSize - segmentHeaderSize )
166194}
195+
196+ // Given a retry interval, nextRetryInterval returns the next higher level
197+ // of backoff.
198+ func (settings Settings ) nextRetryInterval (
199+ currentInterval time.Duration ,
200+ ) time.Duration {
201+ if settings .MaxRetryInterval > 0 {
202+ currentInterval *= 2
203+ if currentInterval > settings .MaxRetryInterval {
204+ currentInterval = settings .MaxRetryInterval
205+ }
206+ }
207+ return currentInterval
208+ }
0 commit comments