Skip to content

Commit 2d07916

Browse files
committed
Store paths on structs instead of threading through params
Instead of passing *paths.Path through every method call, store it on diskQueue, readerLoop, writerLoop, and deleterLoop structs. This avoids cascading signature changes while keeping paths out of Settings. Also fixes the missed callsite in outputs/util.go.
1 parent 29b978c commit 2d07916

13 files changed

Lines changed: 69 additions & 56 deletions

File tree

libbeat/outputs/util.go

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -62,8 +62,7 @@ func Success(
6262
if err != nil {
6363
return Group{}, fmt.Errorf("unable to get disk queue settings: %w", err)
6464
}
65-
settings.Paths = beatPaths
66-
q = diskqueue.FactoryForSettings(settings)
65+
q = diskqueue.FactoryForSettings(settings, beatPaths)
6766
default:
6867
return Group{}, fmt.Errorf("unknown queue type: %s", cfg.Name())
6968
}

libbeat/publisher/pipeline/pipeline.go

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -315,8 +315,7 @@ func queueFactoryForUserConfig(queueType string, userConfig *conf.C, paths *path
315315
if err != nil {
316316
return nil, err
317317
}
318-
settings.Paths = paths
319-
return diskqueue.FactoryForSettings(settings), nil
318+
return diskqueue.FactoryForSettings(settings, paths), nil
320319
default:
321320
return nil, fmt.Errorf("unrecognized queue type '%v'", queueType)
322321
}

libbeat/publisher/queue/diskqueue/benchmark_test.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@ import (
3939
"github.com/elastic/beats/v7/libbeat/publisher/queue"
4040
"github.com/elastic/elastic-agent-libs/logp"
4141
"github.com/elastic/elastic-agent-libs/mapstr"
42+
"github.com/elastic/elastic-agent-libs/paths"
4243
)
4344

4445
var (
@@ -78,7 +79,7 @@ func setup(b *testing.B, compress bool, protobuf bool) (*diskQueue, queue.Produc
7879
s.Path = b.TempDir()
7980

8081
s.UseCompression = compress
81-
q, err := NewQueue(logp.NewNopLogger(), nil, s, nil)
82+
q, err := NewQueue(logp.NewNopLogger(), nil, s, nil, &paths.Path{})
8283
if err != nil {
8384
panic(err)
8485
}

libbeat/publisher/queue/diskqueue/config.go

Lines changed: 7 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -31,12 +31,6 @@ import (
3131
// Settings contains the configuration fields to create a new disk queue
3232
// or open an existing one.
3333
type Settings struct {
34-
// Paths holds the per-beat paths definition. When set, it is used to
35-
// resolve the default diskqueue directory instead of the global
36-
// paths.Resolve. This is important for beat receivers where each beat
37-
// needs its own data directory.
38-
Paths *paths.Path
39-
4034
// The path on disk of the queue's containing directory, which will be
4135
// created if it doesn't exist. Within the directory, the queue's state
4236
// is stored in state.dat and each segment's data is stored in
@@ -127,7 +121,7 @@ func (c *userConfig) Validate() error {
127121
func DefaultSettings() Settings {
128122
return Settings{
129123
MaxSegmentSize: 100 * (1 << 20), // 100MiB
130-
MaxBufferSize: (1 << 30), // 1GiB
124+
MaxBufferSize: 1 << 30, // 1GiB
131125

132126
ReadAheadLimit: 512,
133127
WriteAheadLimit: 2048,
@@ -177,20 +171,20 @@ func SettingsForUserConfig(config *config.C) (Settings, error) {
177171
// bookkeeping helpers
178172
//
179173

180-
func (settings Settings) directoryPath() string {
174+
func (settings Settings) directoryPath(fallback *paths.Path) string {
181175
if settings.Path == "" {
182-
return settings.Paths.Resolve(paths.Data, "diskqueue")
176+
return fallback.Resolve(paths.Data, "diskqueue")
183177
}
184178
return settings.Path
185179
}
186180

187-
func (settings Settings) stateFilePath() string {
188-
return filepath.Join(settings.directoryPath(), "state.dat")
181+
func (settings Settings) stateFilePath(fallback *paths.Path) string {
182+
return filepath.Join(settings.directoryPath(fallback), "state.dat")
189183
}
190184

191-
func (settings Settings) segmentPath(segmentID segmentID) string {
185+
func (settings Settings) segmentPath(segmentID segmentID, fallback *paths.Path) string {
192186
return filepath.Join(
193-
settings.directoryPath(),
187+
settings.directoryPath(fallback),
194188
fmt.Sprintf("%v.seg", segmentID))
195189
}
196190

libbeat/publisher/queue/diskqueue/config_test.go

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -29,30 +29,30 @@ import (
2929
func TestDirectoryPath(t *testing.T) {
3030
tests := map[string]struct {
3131
settings Settings
32+
fallback *paths.Path
3233
expected string
3334
}{
3435
"explicit path takes precedence": {
3536
settings: Settings{
3637
Path: "/custom/queue/path",
37-
Paths: &paths.Path{
38-
Data: "/beat/data",
39-
},
38+
},
39+
fallback: &paths.Path{
40+
Data: "/beat/data",
4041
},
4142
expected: "/custom/queue/path",
4243
},
4344
"per-beat paths used when Path is empty": {
44-
settings: Settings{
45-
Paths: &paths.Path{
46-
Data: "/beat/data",
47-
},
45+
settings: Settings{},
46+
fallback: &paths.Path{
47+
Data: "/beat/data",
4848
},
4949
expected: filepath.Join("/beat/data", "diskqueue"),
5050
},
5151
}
5252

5353
for name, test := range tests {
5454
t.Run(name, func(t *testing.T) {
55-
result := test.settings.directoryPath()
55+
result := test.settings.directoryPath(test.fallback)
5656
assert.Equal(t, test.expected, result)
5757
})
5858
}

libbeat/publisher/queue/diskqueue/core_loop.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -168,7 +168,7 @@ func (dq *diskQueue) handleReaderLoopResponse(response readerLoopResponse) {
168168

169169
dq.logger.Errorf(
170170
"Error reading segment file %s: %v",
171-
dq.settings.segmentPath(segment.id), response.err)
171+
dq.settings.segmentPath(segment.id, dq.paths), response.err)
172172
}
173173
}
174174

libbeat/publisher/queue/diskqueue/deleter_loop.go

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,9 +21,12 @@ import (
2121
"errors"
2222
"os"
2323
"time"
24+
25+
"github.com/elastic/elastic-agent-libs/paths"
2426
)
2527

2628
type deleterLoop struct {
29+
paths *paths.Path
2730
// The settings for the queue that created this loop.
2831
settings Settings
2932

@@ -47,8 +50,9 @@ type deleterLoopResponse struct {
4750
results []error
4851
}
4952

50-
func newDeleterLoop(settings Settings) *deleterLoop {
53+
func newDeleterLoop(settings Settings, paths *paths.Path) *deleterLoop {
5154
return &deleterLoop{
55+
paths: paths,
5256
settings: settings,
5357

5458
requestChan: make(chan deleterLoopRequest, 1),
@@ -67,7 +71,7 @@ func (dl *deleterLoop) run() {
6771
results := []error{}
6872
deletedCount := 0
6973
for _, segment := range request.segments {
70-
path := dl.settings.segmentPath(segment.id)
74+
path := dl.settings.segmentPath(segment.id, dl.paths)
7175
err := os.Remove(path)
7276
// We ignore errors caused by the file not existing: this shouldn't
7377
// happen, but it is still safe to report it as successfully removed.

libbeat/publisher/queue/diskqueue/queue.go

Lines changed: 17 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ import (
2525

2626
"github.com/elastic/beats/v7/libbeat/publisher/queue"
2727
"github.com/elastic/elastic-agent-libs/logp"
28+
"github.com/elastic/elastic-agent-libs/paths"
2829
)
2930

3031
// The string used to specify this queue in beats configurations.
@@ -36,6 +37,7 @@ type diskQueue struct {
3637
logger *logp.Logger
3738
observer queue.Observer
3839
settings Settings
40+
paths *paths.Path
3941

4042
// Metadata related to the segment files.
4143
segments diskQueueSegments
@@ -95,14 +97,14 @@ type diskQueue struct {
9597
// FactoryForSettings is a simple wrapper around NewQueue so a concrete
9698
// Settings object can be wrapped in a queue-agnostic interface for
9799
// later use by the pipeline.
98-
func FactoryForSettings(settings Settings) queue.QueueFactory {
100+
func FactoryForSettings(settings Settings, paths *paths.Path) queue.QueueFactory {
99101
return func(
100102
logger *logp.Logger,
101103
observer queue.Observer,
102104
inputQueueSize int,
103105
encoderFactory queue.EncoderFactory,
104106
) (queue.Queue, error) {
105-
return NewQueue(logger, observer, settings, encoderFactory)
107+
return NewQueue(logger, observer, settings, encoderFactory, paths)
106108
}
107109
}
108110

@@ -113,10 +115,14 @@ func NewQueue(
113115
observer queue.Observer,
114116
settings Settings,
115117
encoderFactory queue.EncoderFactory,
118+
paths *paths.Path,
116119
) (*diskQueue, error) {
120+
if paths == nil {
121+
return nil, errors.New("got nil paths")
122+
}
117123
logger = logger.Named("diskqueue")
118124
logger.Debugf(
119-
"Initializing disk queue at path %v", settings.directoryPath())
125+
"Initializing disk queue at path %v", settings.directoryPath(paths))
120126
if observer == nil {
121127
observer = queue.NewQueueObserver(nil)
122128
}
@@ -131,13 +137,13 @@ func NewQueue(
131137
observer.MaxBytes(int(settings.MaxBufferSize)) //nolint:gosec // G115 Conversion from uint64 to int is safe here.
132138

133139
// Create the given directory path if it doesn't exist.
134-
err := os.MkdirAll(settings.directoryPath(), os.ModePerm)
140+
err := os.MkdirAll(settings.directoryPath(paths), os.ModePerm)
135141
if err != nil {
136142
return nil, fmt.Errorf("couldn't create disk queue directory: %w", err)
137143
}
138144

139145
// Load the previous queue position, if any.
140-
nextReadPosition, err := queuePositionFromPath(settings.stateFilePath())
146+
nextReadPosition, err := queuePositionFromPath(settings.stateFilePath(paths))
141147
if err != nil && !errors.Is(err, os.ErrNotExist) {
142148
// Errors reading / writing the position are non-fatal -- we just log a
143149
// warning and fall back on the oldest existing segment, if any.
@@ -153,7 +159,7 @@ func NewQueue(
153159
nextReadPosition.byteIndex = 0
154160
}
155161
positionFile, err := os.OpenFile(
156-
settings.stateFilePath(), os.O_WRONLY|os.O_CREATE, 0600)
162+
settings.stateFilePath(paths), os.O_WRONLY|os.O_CREATE, 0600)
157163
if err != nil {
158164
// This is not the _worst_ error: we could try operating even without a
159165
// position file. But it indicates a problem with the queue permissions on
@@ -166,7 +172,7 @@ func NewQueue(
166172

167173
// Index any existing data segments to be placed in segments.reading.
168174
initialSegments, err :=
169-
scanExistingSegments(logger, settings.directoryPath())
175+
scanExistingSegments(logger, settings.directoryPath(paths))
170176
if err != nil {
171177
return nil, err
172178
}
@@ -219,6 +225,7 @@ func NewQueue(
219225
logger: logger,
220226
observer: observer,
221227
settings: settings,
228+
paths: paths,
222229

223230
segments: diskQueueSegments{
224231
reading: initialSegments,
@@ -229,9 +236,9 @@ func NewQueue(
229236

230237
acks: newDiskQueueACKs(logger, nextReadPosition, positionFile),
231238

232-
readerLoop: newReaderLoop(settings, encoder),
233-
writerLoop: newWriterLoop(logger, settings),
234-
deleterLoop: newDeleterLoop(settings),
239+
readerLoop: newReaderLoop(settings, encoder, paths),
240+
writerLoop: newWriterLoop(logger, settings, paths),
241+
deleterLoop: newDeleterLoop(settings, paths),
235242

236243
producerWriteRequestChan: make(chan producerWriteRequest),
237244

libbeat/publisher/queue/diskqueue/queue_test.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ import (
2626
"github.com/elastic/beats/v7/libbeat/publisher/queue"
2727
"github.com/elastic/beats/v7/libbeat/publisher/queue/queuetest"
2828
"github.com/elastic/elastic-agent-libs/logp/logptest"
29+
"github.com/elastic/elastic-agent-libs/paths"
2930
)
3031

3132
var seed int64
@@ -78,7 +79,7 @@ func makeTestQueue() queuetest.QueueFactory {
7879
settings := DefaultSettings()
7980
settings.Path = dir
8081
logger := logptest.NewTestingLogger(t, "")
81-
queue, _ := NewQueue(logger, nil, settings, nil)
82+
queue, _ := NewQueue(logger, nil, settings, nil, &paths.Path{})
8283
return testQueue{
8384
diskQueue: queue,
8485
}

libbeat/publisher/queue/diskqueue/reader_loop.go

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ import (
2323
"io"
2424

2525
"github.com/elastic/beats/v7/libbeat/publisher/queue"
26+
"github.com/elastic/elastic-agent-libs/paths"
2627
)
2728

2829
// startPosition and endPosition are absolute byte offsets into the segment
@@ -51,6 +52,7 @@ type readerLoopResponse struct {
5152
type readerLoop struct {
5253
// The settings for the queue that created this loop.
5354
settings Settings
55+
paths *paths.Path
5456

5557
// When there is a block available for reading, it will be sent to
5658
// requestChan. When the reader loop has finished processing it, it
@@ -76,9 +78,10 @@ type readerLoop struct {
7678
outputEncoder queue.Encoder
7779
}
7880

79-
func newReaderLoop(settings Settings, outputEncoder queue.Encoder) *readerLoop {
81+
func newReaderLoop(settings Settings, outputEncoder queue.Encoder, paths *paths.Path) *readerLoop {
8082
return &readerLoop{
8183
settings: settings,
84+
paths: paths,
8285

8386
requestChan: make(chan readerLoopRequest, 1),
8487
responseChan: make(chan readerLoopResponse),
@@ -107,7 +110,7 @@ func (rl *readerLoop) processRequest(request readerLoopRequest) readerLoopRespon
107110
nextFrameID := request.startFrameID
108111

109112
// Open the file and seek to the starting position.
110-
handle, err := request.segment.getReader(rl.settings)
113+
handle, err := request.segment.getReader(rl.settings, rl.paths)
111114
if err != nil {
112115
return readerLoopResponse{err: err}
113116
}

0 commit comments

Comments
 (0)