@@ -93,10 +93,10 @@ func (dq *diskQueue) handleProducerWriteRequest(request producerWriteRequest) {
9393 // than an entire segment all by itself (as long as it isn't, it is
9494 // guaranteed to eventually enter the queue assuming no disk errors).
9595 frameSize := request .frame .sizeOnDisk ()
96- if dq .settings .MaxSegmentSize < frameSize {
96+ if dq .settings .maxSegmentOffset () < segmentOffset ( frameSize ) {
9797 dq .logger .Warnf (
98- "Rejecting event with size %v because the maximum segment size is %v" ,
99- frameSize , dq .settings .MaxSegmentSize )
98+ "Rejecting event with size %v because the segment buffer limit is %v" ,
99+ frameSize , dq .settings .maxSegmentOffset () )
100100 request .responseChan <- false
101101 return
102102 }
@@ -326,13 +326,19 @@ func (dq *diskQueue) maybeWritePending() {
326326 // Nothing to do right now
327327 return
328328 }
329+
329330 // Remove everything from pendingFrames and forward it to the writer loop.
330331 frames := dq .pendingFrames
331332 dq .pendingFrames = nil
333+ dq .writerLoop .requestChan <- writerLoopRequest {frames : frames }
332334
333- dq .writerLoop .requestChan <- writerLoopRequest {
334- frames : frames ,
335+ // Compute the size of the request so we know how full the queue is going
336+ // to be.
337+ totalSize := uint64 (0 )
338+ for _ , sf := range frames {
339+ totalSize += sf .frame .sizeOnDisk ()
335340 }
341+ dq .writeRequestSize = totalSize
336342 dq .writing = true
337343}
338344
@@ -471,8 +477,12 @@ func (dq *diskQueue) canAcceptFrameOfSize(frameSize uint64) bool {
471477 // left in the queue after accounting for the existing segments and the
472478 // pending writes that were already accepted.
473479 pendingBytes := uint64 (0 )
474- for _ , request := range dq .pendingFrames {
475- pendingBytes += request .frame .sizeOnDisk ()
480+ for _ , sf := range dq .pendingFrames {
481+ pendingBytes += sf .frame .sizeOnDisk ()
482+ }
483+ // If a writing request is outstanding, include it in the size total.
484+ if dq .writing {
485+ pendingBytes += dq .writeRequestSize
476486 }
477487 currentSize := pendingBytes + dq .segments .sizeOnDisk ()
478488
0 commit comments