@@ -58,10 +58,15 @@ func (dq *diskQueue) run() {
5858 // The writer loop completed a request, so check if there is more
5959 // data to be sent.
6060 dq .maybeWritePending ()
61- // We also check whether the reader loop is waiting for the data
62- // that was just written.
61+
62+ // The data that was just written is now available for reading, so check
63+ // if we should start a new read request.
6364 dq .maybeReadPending ()
6465
66+ // pendingFrames should now be empty. If any producers were blocked
67+ // because pendingFrames hit settings.WriteAheadLimit, wake them up.
68+ dq .maybeUnblockProducers ()
69+
6570 // Reader loop handling
6671 case readerLoopResponse := <- dq .readerLoop .responseChan :
6772 dq .handleReaderLoopResponse (readerLoopResponse )
@@ -417,22 +422,25 @@ func (dq *diskQueue) enqueueWriteFrame(frame *writeFrame) {
417422 })
418423}
419424
420- // canAcceptFrameOfSize checks whether there is enough free space in the
421- // queue (subject to settings.MaxBufferSize) to accept a new frame with
422- // the given size. Size includes both the serialized data and the frame
423- // header / footer; the easy way to do this for a writeFrame is to pass
425+ // canAcceptFrameOfSize checks whether there is enough free space in the queue
426+ // (subject to settings.{ MaxBufferSize, WriteAheadLimit} ) to accept a new
427+ // frame with the given size. Size includes both the serialized data and the
428+ // frame header / footer; the easy way to do this for a writeFrame is to pass
424429// in frame.sizeOnDisk().
425430// Capacity calculations do not include requests in the blockedProducers
426431// list (that data is owned by its callers and we can't touch it until
427432// we are ready to respond). That allows this helper to be used both while
428433// handling producer requests and while deciding whether to unblock
429434// producers after free capacity increases.
430- // If we decide to add limits on how many events / bytes can be stored
431- // in pendingFrames (to avoid unbounded memory use if the input is faster
432- // than the disk), this is the function to modify.
433435func (dq * diskQueue ) canAcceptFrameOfSize (frameSize uint64 ) bool {
436+ // If pendingFrames is already at the WriteAheadLimit, we can't accept
437+ // any new frames right now.
438+ if len (dq .pendingFrames ) >= dq .settings .WriteAheadLimit {
439+ return false
440+ }
441+
442+ // If the queue size is unbounded (max == 0), we accept.
434443 if dq .settings .MaxBufferSize == 0 {
435- // Currently we impose no limitations if the queue size is unbounded.
436444 return true
437445 }
438446
0 commit comments