-
Notifications
You must be signed in to change notification settings - Fork 3.7k
Description
Search before asking
- I searched in the issues and found nothing similar.
Read release policy
- I understand that unsupported versions don't get bug fixes. I will attempt to reproduce the issue on a supported version of Pulsar client and Pulsar broker.
Version
master
Minimal reproduce step
Fixing some issues as part of #23231 such as
Lines 790 to 791 in 1439529
| int remainUnAckedMessages = Math.max(c.getMaxUnackedMessages() - c.getUnackedMessages(), 0); | |
| availablePermits = Math.min(availablePermits, remainUnAckedMessages); |
The problem is that permits use a batch message count and unack messages uses individual message counts. These shouldn't be mixed. It's possible to convert the batch message count to an estimated number of messages with the average number of messages in batch value that is kept in the consumer.
There are several locations where this is mixed up.
This also applies to documentation. The javadoc of receiverQueueSize is very confusing and doesn't tell that it's counted in batch messages:
pulsar/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ConsumerBuilder.java
Lines 356 to 387 in 10f4e02
| /** | |
| * Sets the size of the consumer receive queue. | |
| * | |
| * <p>The consumer receive queue controls how many messages can be accumulated by the {@link Consumer} before the | |
| * application calls {@link Consumer#receive()}. Using a higher value can potentially increase consumer | |
| * throughput at the expense of bigger memory utilization. | |
| * | |
| * <p>For the consumer that subscribes to the partitioned topic, the parameter | |
| * {@link ConsumerBuilder#maxTotalReceiverQueueSizeAcrossPartitions} also affects | |
| * the number of messages accumulated in the consumer. | |
| * | |
| * <p><b>Setting the consumer queue size as zero</b> | |
| * <ul> | |
| * <li>Decreases the throughput of the consumer by disabling pre-fetching of messages. This approach improves the | |
| * message distribution on shared subscriptions by pushing messages only to the consumers that are ready to process | |
| * them. Neither {@link Consumer#receive(int, TimeUnit)} nor Partitioned Topics can be used if the consumer queue | |
| * size is zero. {@link Consumer#receive()} function call should not be interrupted when the consumer queue size is | |
| * zero.</li> | |
| * <li>Doesn't support Batch-Message. If a consumer receives a batch-message, it closes the consumer connection with | |
| * the broker and {@link Consumer#receive()} calls remain blocked while {@link Consumer#receiveAsync()} receives | |
| * exception in callback. | |
| * | |
| * <b> The consumer is not able to receive any further messages unless batch-message in pipeline | |
| * is removed.</b></li> | |
| * </ul> | |
| * The default value is {@code 1000} messages and should be adequate for most use cases. | |
| * | |
| * @param receiverQueueSize | |
| * the new receiver queue size value | |
| * @return the consumer builder instance | |
| */ | |
| ConsumerBuilder<T> receiverQueueSize(int receiverQueueSize); |
What did you expect to see?
The permit calculations in dispatchers shouldn't mix individual message counts and batch message counts.
The documentation needs updates too so that it's clearly defined whether a "message" refers to an individual message or a batch message (a batch of individual messages).
What did you see instead?
- calculations mixed in several locations
- documentation doesn't clearly define whether a message is an individual message or a batch of messages. example is the receiverQueueSize javadoc.
Anything else?
No response
Are you willing to submit a PR?
- I'm willing to submit a PR!
Metadata
Metadata
Assignees
Labels
Type
Projects
Status