@@ -93,10 +93,10 @@ func (dq *diskQueue) handleProducerWriteRequest(request producerWriteRequest) {
9393 // than an entire segment all by itself (as long as it isn't, it is
9494 // guaranteed to eventually enter the queue assuming no disk errors).
9595 frameSize := request .frame .sizeOnDisk ()
96- if dq .settings .maxSegmentOffset () < segmentOffset ( frameSize ) {
96+ if frameSize > dq .settings .maxValidFrameSize ( ) {
9797 dq .logger .Warnf (
9898 "Rejecting event with size %v because the segment buffer limit is %v" ,
99- frameSize , dq .settings .maxSegmentOffset ())
99+ frameSize , dq .settings .maxValidFrameSize ())
100100 request .responseChan <- false
101101 return
102102 }
@@ -129,14 +129,15 @@ func (dq *diskQueue) handleWriterLoopResponse(response writerLoopResponse) {
129129 // The writer loop response contains the number of bytes written to
130130 // each segment that appeared in the request. Entries always appear in
131131 // the same sequence as (the beginning of) segments.writing.
132- for index , bytesWritten := range response .bytesWritten {
132+ for index , segmentEntry := range response .segments {
133133 // Update the segment with its new size.
134- dq .segments .writing [index ].endOffset += segmentOffset (bytesWritten )
134+ dq .segments .writing [index ].byteCount += segmentEntry .bytesWritten
135+ dq .segments .writing [index ].frameCount += segmentEntry .framesWritten
135136 }
136137
137138 // If there is more than one segment in the response, then all but the
138139 // last have been closed and are ready to move to the reading list.
139- closedCount := len (response .bytesWritten ) - 1
140+ closedCount := len (response .segments ) - 1
140141 if closedCount > 0 {
141142 // Remove the prefix of the writing array and append to to reading.
142143 closedSegments := dq .segments .writing [:closedCount ]
@@ -151,37 +152,19 @@ func (dq *diskQueue) handleReaderLoopResponse(response readerLoopResponse) {
151152
152153 // Advance the frame / offset based on what was just completed.
153154 dq .segments .nextReadFrameID += frameID (response .frameCount )
154- dq .segments .nextReadOffset += segmentOffset ( response .byteCount )
155+ dq .segments .nextReadPosition += response .byteCount
155156
156- var segment * queueSegment
157- if len (dq .segments .reading ) > 0 {
158- // A segment is finished if we have read all the data, or
159- // the read response reports an error.
160- // Segments in the reading list have been completely written,
161- // so we can rely on their endOffset field to determine their size.
162- segment = dq .segments .reading [0 ]
163- if dq .segments .nextReadOffset >= segment .endOffset || response .err != nil {
164- dq .segments .reading = dq .segments .reading [1 :]
165- dq .segments .acking = append (dq .segments .acking , segment )
166- dq .segments .nextReadOffset = 0
167- }
168- } else {
169- // A segment in the writing list can't be finished writing,
170- // so we don't check the endOffset.
171- segment = dq .segments .writing [0 ]
172- if response .err != nil {
173- // Errors reading a writing segment are awkward since we can't discard
174- // them until the writer loop is done with them. Instead we just seek
175- // to the end of the current data region. If we're lucky this lets us
176- // skip the intervening errors; if not, the segment will be cleaned up
177- // after the writer loop is done with it.
178- dq .segments .nextReadOffset = segment .endOffset
179- }
180- }
157+ segment := dq .segments .readingSegment ()
181158 segment .framesRead += response .frameCount
182-
183- // If there was an error, report it.
184159 if response .err != nil {
160+ // If there's an error, we advance to the end of the current segment.
161+ // If the segment is in the reading list, it will be removed on the
162+ // next call to maybeReadPending.
163+ // If the segment is still in the writing list, we can't discard it
164+ // until the writer loop is done with it, but we can hope that advancing
165+ // to the current write position will get us out of our error state.
166+ dq .segments .nextReadPosition = segment .byteCount
167+
185168 dq .logger .Errorf (
186169 "Error reading segment file %s: %v" ,
187170 dq .settings .segmentPath (segment .id ), response .err )
@@ -197,7 +180,7 @@ func (dq *diskQueue) handleDeleterLoopResponse(response deleterLoopResponse) {
197180 // This segment had an error, so it stays in the acked list.
198181 newAckedSegments = append (newAckedSegments , dq .segments .acked [i ])
199182 errors = append (errors ,
200- fmt .Errorf ("Couldn 't delete segment %d: %w" ,
183+ fmt .Errorf ("couldn 't delete segment %d: %w" ,
201184 dq .segments .acked [i ].id , err ))
202185 }
203186 }
@@ -209,7 +192,7 @@ func (dq *diskQueue) handleDeleterLoopResponse(response deleterLoopResponse) {
209192 }
210193 dq .segments .acked = newAckedSegments
211194 if len (errors ) > 0 {
212- dq .logger .Errorw ("Deleting segment files" , "errors" , errors )
195+ dq .logger .Errorw ("deleting segment files" , "errors" , errors )
213196 }
214197}
215198
@@ -304,7 +287,7 @@ func (dq *diskQueue) handleShutdown() {
304287 // delete things before the current segment.
305288 if len (dq .segments .writing ) > 0 &&
306289 finalPosition .segmentID == dq .segments .writing [0 ].id &&
307- finalPosition .offset >= dq .segments .writing [0 ].endOffset {
290+ finalPosition .byteIndex >= dq .segments .writing [0 ].byteCount {
308291 dq .handleSegmentACK (finalPosition .segmentID )
309292 } else if finalPosition .segmentID > 0 {
310293 dq .handleSegmentACK (finalPosition .segmentID - 1 )
@@ -353,43 +336,53 @@ func (segments *diskQueueSegments) readingSegment() *queueSegment {
353336 return nil
354337}
355338
339+ // If the first entry of the reading list has been completely consumed,
340+ // move it to the acking list and update the read position.
341+ func (dq * diskQueue ) maybeAdvanceReadingList () {
342+ if len (dq .segments .reading ) > 0 {
343+ segment := dq .segments .reading [0 ]
344+ if dq .segments .nextReadPosition >= segment .byteCount {
345+ dq .segments .acking = append (dq .segments .acking , dq .segments .reading [0 ])
346+ dq .segments .reading = dq .segments .reading [1 :]
347+ dq .segments .nextReadPosition = 0
348+ }
349+ }
350+ }
351+
356352// If the reading list is nonempty, and there are no outstanding read
357353// requests, send one.
358354func (dq * diskQueue ) maybeReadPending () {
359355 if dq .reading {
360356 // A read request is already pending
361357 return
362358 }
363- // Check if the next reading segment has already been completely read. (This
364- // can happen if it was being written and read simultaneously.) In this case
365- // we should move it to the acking list and proceed to the next segment.
366- if len (dq .segments .reading ) > 0 &&
367- dq .segments .nextReadOffset >= dq .segments .reading [0 ].endOffset {
368- dq .segments .acking = append (dq .segments .acking , dq .segments .reading [0 ])
369- dq .segments .reading = dq .segments .reading [1 :]
370- dq .segments .nextReadOffset = 0
371- }
359+ // If the current segment has already been completely read, move to
360+ // the next one.
361+ dq .maybeAdvanceReadingList ()
362+
372363 // Get the next available segment from the reading or writing lists.
373364 segment := dq .segments .readingSegment ()
374365 if segment == nil ||
375- dq .segments .nextReadOffset >= segmentOffset ( segment .endOffset ) {
366+ dq .segments .nextReadPosition >= segment .byteCount {
376367 // Nothing to read
377368 return
378369 }
379- if dq .segments .nextReadOffset == 0 {
380- // If we're reading the beginning of this segment, assign its firstFrameID
381- // so we can recognize its acked frames later.
382- // The first segment we read might not have its initial nextReadOffset
383- // set to 0 if the segment was already partially read on a previous run.
384- // However that can only happen when nextReadFrameID == 0, so we don't
385- // need to do anything in that case.
370+ if dq .segments .nextReadPosition == 0 {
371+ // If we're reading this segment for the first time, assign its
372+ // firstFrameID so we can recognize its acked frames later, and advance
373+ // the reading position to the end of the segment header.
374+ // The first segment we read might not have the initial nextReadPosition
375+ // set to 0 if it was already partially read on a previous run.
376+ // However that can only happen when nextReadFrameID == 0, so in that
377+ // case firstFrameID is already initialized to the correct value.
386378 segment .firstFrameID = dq .segments .nextReadFrameID
379+ dq .segments .nextReadPosition = segment .headerSize ()
387380 }
388381 request := readerLoopRequest {
389- segment : segment ,
390- startFrameID : dq .segments .nextReadFrameID ,
391- startOffset : dq .segments .nextReadOffset ,
392- endOffset : segment .endOffset ,
382+ segment : segment ,
383+ startFrameID : dq .segments .nextReadFrameID ,
384+ startPosition : dq .segments .nextReadPosition ,
385+ endPosition : segment .byteCount ,
393386 }
394387 dq .readerLoop .requestChan <- request
395388 dq .reading = true
@@ -433,18 +426,20 @@ func (dq *diskQueue) enqueueWriteFrame(frame *writeFrame) {
433426 if len (dq .segments .writing ) > 0 {
434427 segment = dq .segments .writing [len (dq .segments .writing )- 1 ]
435428 }
436- frameLen := segmentOffset ( frame .sizeOnDisk () )
429+ newSegmentSize := dq . segments . writingSegmentSize + frame .sizeOnDisk ()
437430 // If segment is nil, or the new segment exceeds its bounds,
438431 // we need to create a new writing segment.
439432 if segment == nil ||
440- dq . segments . nextWriteOffset + frameLen > dq .settings .maxSegmentOffset () {
433+ newSegmentSize > dq .settings .MaxSegmentSize {
441434 segment = & queueSegment {id : dq .segments .nextID }
442435 dq .segments .writing = append (dq .segments .writing , segment )
443436 dq .segments .nextID ++
444- dq .segments .nextWriteOffset = 0
437+ // Reset the on-disk size to its initial value, the file's header size
438+ // with no frame data.
439+ newSegmentSize = segmentHeaderSize
445440 }
446441
447- dq .segments .nextWriteOffset += frameLen
442+ dq .segments .writingSegmentSize = newSegmentSize
448443 dq .pendingFrames = append (dq .pendingFrames , segmentedFrame {
449444 frame : frame ,
450445 segment : segment ,
0 commit comments