[Typescript] fix prevent message cache size from becoming zero#418
[Typescript] fix prevent message cache size from becoming zero#418Lucio Franco (LucioFranco) wants to merge 1 commit intoconfluentinc:masterfrom
Conversation
Fixes a critical bug where the message cache size calculation could result in 0, causing "Consume was already called" errors and stopping message processing. ## Root Cause When consumption was slow or message rate was low, #messageCacheMaxSize calculated to 0. JavaScript's truthy check (0 && callback is falsy) caused consume(0, callback) to call _consumeLoop() instead of _consumeNum(), starting a persistent background loop that blocked all other workers. ## Solution Added Math.max(1, ...) to ensure cache size is always at least 1, plus defensive validation in the consume() method. ## When This Occurs The bug triggers when messagesPerMillisecondSingleWorker < 0.000333, which happens with: - Slow eachMessage/eachBatch callbacks (DB writes, HTTP calls, etc.) - Low message volumes across multiple partitions - High partitionsConsumedConcurrently settings ## Example Processing 3 messages across 5 partitions taking 6 seconds: ```javascript messagesPerMillisecondSingleWorker = 3 / 5 / 6000 = 0.0001 Math.round(1500 * 0.0001) = 0 // Bug triggers ``` Fixes confluentinc#415
There was a problem hiding this comment.
Pull request overview
This PR fixes a critical bug where the message cache size calculation could incorrectly evaluate to 0, causing message processing to halt with "Consume was already called" errors. The fix ensures the cache size is always at least 1 by adding 1 inside the Math.ceil() call before performing the calculation.
Key Changes:
- Modified cache size calculation to guarantee minimum value of 1
- Prevents division-by-zero-like behavior when message consumption is slow or message rates are low
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
|
thanks Lucio Franco (@LucioFranco) ! |
|
cc Robert Yokota (@rayokota) Emanuele Sabellico (@emasab) any chance this could get looked at? |
|
I would likewise appreciate some traction here, especially as it seems that a new release is being prepared. My team is attempting to migrate a service from |
|
Lucio Franco (@LucioFranco) thanks a lot for the report and the proposed fix. I've created #424 for a different approach for the fix to avoid adding +1 in any case but most of the times the effect is the same because #maxBatchSize cannot be lower that 1 so #maxBatchesSize = #maxBatchSize * #concurrency cannot be lower that 1 * #concurrency. |
|
Emanuele Sabellico (@emasab) sounds good, thanks! I will close this. |
Fixes a critical bug where the message cache size calculation could result in 0, causing "Consume was already called" errors and stopping message processing.
Root Cause
When consumption was slow or message rate was low, #messageCacheMaxSize calculated to 0. JavaScript's truthy check (0 && callback is falsy) caused consume(0, callback) to call _consumeLoop() instead of _consumeNum(), starting a persistent background loop that blocked all other workers.
Solution
Added Math.max(1, ...) to ensure cache size is always at least 1, plus defensive validation in the consume() method.
When This Occurs
The bug triggers when messagesPerMillisecondSingleWorker < 0.000333, which happens with:
Example
Processing 3 messages across 5 partitions taking 6 seconds:
Fixes #415