Configurable batch size #393
Conversation
rebalances in case of long processing time
… one partition and there are messages being fetched for other partitions
…me before first assignment
There was a problem hiding this comment.
Pull Request Overview
This PR implements configurable batch size for consumer operations through new JavaScript-specific properties js.consumer.max.batch.size and js.consumer.max.cache.size.per.worker.ms. The changes include a new cache size calculation algorithm based on consumption rate estimation and fixes for message cache management during rebalances.
Key changes:
- Adds configuration options for batch size (-1 for unlimited) and cache size based on consumption rate
- Replaces the fixed cache increase/decrease logic with dynamic size calculation based on message consumption rate
- Introduces message return mechanism to preserve at-least-once delivery guarantees during pending operations
Reviewed Changes
Copilot reviewed 12 out of 14 changed files in this pull request and generated 3 comments.
Show a summary per file
| File | Description |
|---|---|
| lib/kafkajs/_consumer.js | Implements new configuration properties, dynamic cache sizing, and message return logic |
| lib/kafkajs/_consumer_cache.js | Adds methods to prepend messages back to cache for reprocessing |
| test/promisified/consumer/consumerCacheTests.spec.js | Updates test parameters and timing to work with new batch size behavior |
| test/promisified/consumer/consumeMessages.spec.js | Adjusts tests for dynamic batch sizes and adds producer disconnect calls |
| test/promisified/admin/fetch_offsets.spec.js | Fixes timing issue with offset commit test |
| test/promisified/producer/flush.spec.js | Adds missing producer initialization and cleanup |
| MIGRATION.md | Documents new configuration properties |
| CHANGELOG.md | Adds release notes for v1.6.1 |
| package.json, schemaregistry/package.json, lib/util.js | Version bumps to 1.6.1 |
| ci/update-version.js | Fixes prerelease version separator |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| firstLongBatchProcessing = receivedMessages; | ||
| } | ||
| if ( receivedMessages === 14) { | ||
| if (firstLongBatchProcessing && receivedMessages === firstLongBatchProcessing + 1) { |
There was a problem hiding this comment.
Potential logic error: the condition checks receivedMessages === firstLongBatchProcessing + 1, but firstLongBatchProcessing is set inside a condition that may be true multiple times. If multiple batches have >= 32 messages, firstLongBatchProcessing will be set multiple times, causing this condition to trigger incorrectly.
| for (const msg of messages) { | ||
| const key = partitionKey(msg); | ||
| partitionsNum.set(key, 1); | ||
| if (partitionsNum.size >= this.#concurrency) { | ||
| break; | ||
| } | ||
| } |
There was a problem hiding this comment.
The loop counts unique partition keys but always sets the value to 1. This could be simplified by using a Set instead of a Map, which would make the intent clearer and slightly improve performance.
Milind L (milindl)
left a comment
There was a problem hiding this comment.
Minor comments, looks generally ok and I'd seen the previous PR
CHANGELOG.md
Outdated
| @@ -1,3 +1,16 @@ | |||
| # confluent-kafka-javascript 1.6.1 | |||
|
|
|||
| v1.6.1 is a maintenance release. It is supported for all usage. | |||
There was a problem hiding this comment.
It should be 1.7.0
MIGRATION.md
Outdated
| and defaults to 32. `js.consumer.max.cache.size.per.worker.ms` allows to | ||
| configure the cache size estimated based on consumption rate and defaults | ||
| to 1.5 seconds. |
There was a problem hiding this comment.
| and defaults to 32. `js.consumer.max.cache.size.per.worker.ms` allows to | |
| configure the cache size estimated based on consumption rate and defaults | |
| to 1.5 seconds. | |
| and defaults to 32. `js.consumer.max.cache.size.per.worker.ms` allows to | |
| configure the cache size estimated based on consumption rate and defaults to the cache being sized to 1.5s worth of messages. |
There was a problem hiding this comment.
Not sure about this comment exactly, but at the moment I feel as if the js.consumer.max.cache.size.per.worker.ms is not explained fully.
|




Closes #286.
batch size is now configurable through the
js.consumer.max.batch.sizeproperty (-1 for unlimited batch size). Even when unlimited, batch sizes are actually limited by librdkafka consumer buffer.js.consumer.max.cache.size.per.worker.msallows to configure the amount of data kept in buffer in terms of milliseconds based on the estimated consume rate.Checklist
Test & Review
automatic tests and running the performance example