Skip to content

[Typescript] fix prevent message cache size from becoming zero#418

Closed
Lucio Franco (LucioFranco) wants to merge 1 commit intoconfluentinc:masterfrom
LucioFranco:lucio/push-qovsqlvsxsnu
Closed

[Typescript] fix prevent message cache size from becoming zero#418
Lucio Franco (LucioFranco) wants to merge 1 commit intoconfluentinc:masterfrom
LucioFranco:lucio/push-qovsqlvsxsnu

Conversation

@LucioFranco

Fixes a critical bug where the message cache size calculation could result in 0, causing "Consume was already called" errors and stopping message processing.

Root Cause

When consumption was slow or message rate was low, #messageCacheMaxSize calculated to 0. JavaScript's truthy check (0 && callback is falsy) caused consume(0, callback) to call _consumeLoop() instead of _consumeNum(), starting a persistent background loop that blocked all other workers.

Solution

Added Math.max(1, ...) to ensure cache size is always at least 1, plus defensive validation in the consume() method.

When This Occurs

The bug triggers when messagesPerMillisecondSingleWorker < 0.000333, which happens with:

  • Slow eachMessage/eachBatch callbacks (DB writes, HTTP calls, etc.)
  • Low message volumes across multiple partitions
  • High partitionsConsumedConcurrently settings

Example

Processing 3 messages across 5 partitions taking 6 seconds:

messagesPerMillisecondSingleWorker = 3 / 5 / 6000 = 0.0001
Math.round(1500 * 0.0001) = 0  // Bug triggers

Fixes #415

Fixes a critical bug where the message cache size calculation could result in 0,
causing "Consume was already called" errors and stopping message processing.

## Root Cause
When consumption was slow or message rate was low, #messageCacheMaxSize
calculated to 0. JavaScript's truthy check (0 && callback is falsy) caused consume(0,
callback) to call _consumeLoop() instead of _consumeNum(), starting a persistent
background loop that blocked all other workers.

## Solution
Added Math.max(1, ...) to ensure cache size is always at least 1, plus
defensive validation in the consume() method.

## When This Occurs
The bug triggers when messagesPerMillisecondSingleWorker < 0.000333, which happens
with:
- Slow eachMessage/eachBatch callbacks (DB writes, HTTP calls, etc.)
- Low message volumes across multiple partitions
- High partitionsConsumedConcurrently settings

## Example
Processing 3 messages across 5 partitions taking 6 seconds:
```javascript
messagesPerMillisecondSingleWorker = 3 / 5 / 6000 = 0.0001
Math.round(1500 * 0.0001) = 0  // Bug triggers
```

Fixes confluentinc#415
Copy link

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This PR fixes a critical bug where the message cache size calculation could incorrectly evaluate to 0, causing message processing to halt with "Consume was already called" errors. The fix ensures the cache size is always at least 1 by adding 1 inside the Math.ceil() call before performing the calculation.

Key Changes:

  • Modified cache size calculation to guarantee minimum value of 1
  • Prevents division-by-zero-like behavior when message consumption is slow or message rates are low

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

@PabloReszczynski

thanks Lucio Franco (@LucioFranco) !

@LucioFranco
Copy link
Author

cc Robert Yokota (@rayokota) Emanuele Sabellico (@emasab) any chance this could get looked at?

@WaffleStudios

I would likewise appreciate some traction here, especially as it seems that a new release is being prepared.

My team is attempting to migrate a service from kafkajs, and this bug is a critical issue preventing us from moving forward.

@emasab
Copy link
Contributor

Lucio Franco (@LucioFranco) thanks a lot for the report and the proposed fix. I've created #424 for a different approach for the fix to avoid adding +1 in any case but most of the times the effect is the same because #maxBatchSize cannot be lower that 1 so #maxBatchesSize = #maxBatchSize * #concurrency cannot be lower that 1 * #concurrency.

@LucioFranco
Copy link
Author

Emanuele Sabellico (@emasab) sounds good, thanks! I will close this.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Error: Consume was already called + TypeError: messages is not iterable

5 participants