@@ -25,6 +25,7 @@ import (
2525
2626 "github.com/elastic/beats/v7/libbeat/publisher/queue"
2727 "github.com/elastic/elastic-agent-libs/logp"
28+ "github.com/elastic/elastic-agent-libs/paths"
2829)
2930
3031// The string used to specify this queue in beats configurations.
@@ -36,6 +37,7 @@ type diskQueue struct {
3637 logger * logp.Logger
3738 observer queue.Observer
3839 settings Settings
40+ paths * paths.Path
3941
4042 // Metadata related to the segment files.
4143 segments diskQueueSegments
@@ -95,14 +97,14 @@ type diskQueue struct {
9597// FactoryForSettings is a simple wrapper around NewQueue so a concrete
9698// Settings object can be wrapped in a queue-agnostic interface for
9799// later use by the pipeline.
98- func FactoryForSettings (settings Settings ) queue.QueueFactory {
100+ func FactoryForSettings (settings Settings , paths * paths. Path ) queue.QueueFactory {
99101 return func (
100102 logger * logp.Logger ,
101103 observer queue.Observer ,
102104 inputQueueSize int ,
103105 encoderFactory queue.EncoderFactory ,
104106 ) (queue.Queue , error ) {
105- return NewQueue (logger , observer , settings , encoderFactory )
107+ return NewQueue (logger , observer , settings , encoderFactory , paths )
106108 }
107109}
108110
@@ -113,10 +115,14 @@ func NewQueue(
113115 observer queue.Observer ,
114116 settings Settings ,
115117 encoderFactory queue.EncoderFactory ,
118+ paths * paths.Path ,
116119) (* diskQueue , error ) {
120+ if paths == nil {
121+ return nil , errors .New ("got nil paths" )
122+ }
117123 logger = logger .Named ("diskqueue" )
118124 logger .Debugf (
119- "Initializing disk queue at path %v" , settings .directoryPath ())
125+ "Initializing disk queue at path %v" , settings .directoryPath (paths ))
120126 if observer == nil {
121127 observer = queue .NewQueueObserver (nil )
122128 }
@@ -131,13 +137,13 @@ func NewQueue(
131137 observer .MaxBytes (int (settings .MaxBufferSize )) //nolint:gosec // G115 Conversion from uint64 to int is safe here.
132138
133139 // Create the given directory path if it doesn't exist.
134- err := os .MkdirAll (settings .directoryPath (), os .ModePerm )
140+ err := os .MkdirAll (settings .directoryPath (paths ), os .ModePerm )
135141 if err != nil {
136142 return nil , fmt .Errorf ("couldn't create disk queue directory: %w" , err )
137143 }
138144
139145 // Load the previous queue position, if any.
140- nextReadPosition , err := queuePositionFromPath (settings .stateFilePath ())
146+ nextReadPosition , err := queuePositionFromPath (settings .stateFilePath (paths ))
141147 if err != nil && ! errors .Is (err , os .ErrNotExist ) {
142148 // Errors reading / writing the position are non-fatal -- we just log a
143149 // warning and fall back on the oldest existing segment, if any.
@@ -153,7 +159,7 @@ func NewQueue(
153159 nextReadPosition .byteIndex = 0
154160 }
155161 positionFile , err := os .OpenFile (
156- settings .stateFilePath (), os .O_WRONLY | os .O_CREATE , 0600 )
162+ settings .stateFilePath (paths ), os .O_WRONLY | os .O_CREATE , 0600 )
157163 if err != nil {
158164 // This is not the _worst_ error: we could try operating even without a
159165 // position file. But it indicates a problem with the queue permissions on
@@ -166,7 +172,7 @@ func NewQueue(
166172
167173 // Index any existing data segments to be placed in segments.reading.
168174 initialSegments , err :=
169- scanExistingSegments (logger , settings .directoryPath ())
175+ scanExistingSegments (logger , settings .directoryPath (paths ))
170176 if err != nil {
171177 return nil , err
172178 }
@@ -219,6 +225,7 @@ func NewQueue(
219225 logger : logger ,
220226 observer : observer ,
221227 settings : settings ,
228+ paths : paths ,
222229
223230 segments : diskQueueSegments {
224231 reading : initialSegments ,
@@ -229,9 +236,9 @@ func NewQueue(
229236
230237 acks : newDiskQueueACKs (logger , nextReadPosition , positionFile ),
231238
232- readerLoop : newReaderLoop (settings , encoder ),
233- writerLoop : newWriterLoop (logger , settings ),
234- deleterLoop : newDeleterLoop (settings ),
239+ readerLoop : newReaderLoop (settings , encoder , paths ),
240+ writerLoop : newWriterLoop (logger , settings , paths ),
241+ deleterLoop : newDeleterLoop (settings , paths ),
235242
236243 producerWriteRequestChan : make (chan producerWriteRequest ),
237244
0 commit comments