Skip to content

Conversation

@3pacccccc
Copy link
Contributor

Motivation

Currently, ZeroQueueConsumerImpl with MessagePayloadProcessor enabled can incorrectly receive batch messages when it shouldn't. This occurs because:

  1. The message payload processor processes batch messages before checking if the consumer is a zero-queue consumer
  2. This behavior contradicts the expected behavior of zero-queue consumers which should not buffer any messages

The issue can be reproduced when:

  • Using a MessagePayloadProcessor that processes batch messages

Modifications

  1. Added early check in processPayloadByProcessor to detect zero-queue consumers receiving batch messages
  2. For zero-queue consumers with batch messages, bypass the payload processor and directly use receiveIndividualMessagesFromBatch
  3. Added new test case testZeroQueueSizeConsumerWithPayloadProcessorReceiveBatchMessage to verify the fix

Verifying this change

  • Make sure that the change passes the CI checks.

(Please pick either of the following options)

This change is a trivial rework / code cleanup without any test coverage.

(or)

This change is already covered by existing tests, such as (please describe tests).

(or)

This change added tests and can be verified as follows:

(example:)

  • Added integration tests for end-to-end deployment with large payloads (10MB)
  • Extended integration test for recovery after broker failure

Does this pull request potentially affect one of the following parts:

If the box was checked, please highlight the changes

  • Dependencies (add or upgrade a dependency)
  • The public API
  • The schema
  • The default values of configurations
  • The threading model
  • The binary protocol
  • The REST endpoints
  • The admin CLI options
  • The metrics
  • Anything that affects deployment

Documentation

  • doc
  • doc-required
  • doc-not-needed
  • doc-complete

Matching PR in forked repository

PR in forked repository: 3pacccccc#20

@github-actions github-actions bot added the doc-not-needed Your PR changes do not impact docs label Aug 5, 2025
Copy link
Member

@lhotari lhotari left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The motivation isn't very clear yet.

Currently, ZeroQueueConsumerImpl with MessagePayloadProcessor enabled can incorrectly receive batch messages when it shouldn't. This occurs because:

  1. The message payload processor processes batch messages before checking if the consumer is a zero-queue consumer
  2. This behavior contradicts the expected behavior of zero-queue consumers which should not buffer any messages

Answering these questions would help:

  1. Please elaborate more on why you think that this contradicts expected behavior of zero-queue consumers?
  2. What do you consider as "buffering"?
  3. What is your use case?

@3pacccccc
Copy link
Contributor Author

Thanks for your review @lhotari . Here's my answer to your questions:

1.Please elaborate more on why you think that this contradicts expected behavior of zero-queue consumers?

The current behavior with MessagePayloadProcessor creates an unpredictable interaction pattern. Here's a detailed breakdown of what happens:

Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName)
        .batchingMaxMessages(5)
        .enableBatching(true)
        .create();

ConsumerImpl<byte[]> consumer = (ConsumerImpl<byte[]>) pulsarClient.newConsumer()
        .topic(topicName)
        .messagePayloadProcessor(new CustomProcessor())
        .subscriptionName(subscriptionName)
        .receiverQueueSize(0)
        .subscribe();

for (int i = 0; i < totalMessages; i++) {
    String message = messagePredicate + i;
    producer.sendAsync(message.getBytes());
}
producer.flush();
for (int i = 0; i < 10; i++) {
    Message<byte[]> receive = consumer.receive();
}

In this case, here's the step of what will happen:

consumer broker
receive() called
send 1 flow-permits to broker1
Blocks waiting for messages
receive flow-permits, and send 5 messages to broker, set messagePermits as -4
receive 5 messages in a batch
processed batch via processPayloadByProcessor2
put messages into incomingMessages queue
consume 1 message.
trigger one more receive()
find messages in the incomingMessages, maybe 1 message or more, maybe no message, totally depends on the process speed of user's customize processPayloadByProcessor2, let's assume there's 4 message's in the incomingQueue, release them all1
send 1 flow-permits to broker
block and waiting for new messages coming
receive flow-permits, don't send message, set messagePermits as -3

so, as you can see, this zeroQueueConsumer will be blocked indefinitely in this case after consume 1 message. and maybe after consume 2 messages or more, and even won't get blocked, this totally depends on the process speed of user's customize MessagePayloadProcessor.

Key Problems:

  • Permit Mismatch: The broker sends more messages (batches) than requested

  • Hidden Buffering: Messages accumulate in incomingMessages despite queueSize=0

  • Unpredictable Blocking: Behavior depends on processor speed, violating zero-queue guarantees

2.What do you consider as "buffering"?

In my understanding, the fundamental design principle of ZeroQueueConsumer (receiverQueueSize=0) is to be a strictly non-buffering consumer that:

  • Operates in pure request-response mode:

    • Each receive() call should trigger exactly one network request to the broker

    • No messages should be retained client-side between receive() calls

3.What is your use case?

We need ZeroQueueConsumer to:

1.Maintain Strict Pull Semantics: Each receive() should trigger exactly one broker interaction

2.Handle Batches Predictably: Either:Reject batches entirely, or Process them atomically without intermediate buffering

@lhotari
Copy link
Member

lhotari commented Aug 6, 2025

In my understanding, the fundamental design principle of ZeroQueueConsumer (receiverQueueSize=0) is to be a strictly non-buffering consumer that:

  • Operates in pure request-response mode:

    • Each receive() call should trigger exactly one network request to the broker
    • No messages should be retained client-side between receive() calls

I don't think that this is a correct assumption. In Pulsar, the receive method returns individual messages in a batch.
When the broker sends a batch message to the client, it should be buffered on the client side, also with ZeroQueueConsumer.

It's very much possible that you have faced bugs in the ZeroQueueConsumerImpl implementation. When checking at the implementation, it doesn't seem to make a distinction between batch messages. In Pulsar code, it's bit of a mess that in some cases the permits are about batch messages and some times there are about individual messages. I have created an issue about that in #23263 . It's possible that the information in the issue isn't accurate.

@3pacccccc
Copy link
Contributor Author

In my understanding, the fundamental design principle of ZeroQueueConsumer (receiverQueueSize=0) is to be a strictly non-buffering consumer that:

  • Operates in pure request-response mode:

    • Each receive() call should trigger exactly one network request to the broker
    • No messages should be retained client-side between receive() calls

I don't think that this is a correct assumption. In Pulsar, the receive method returns individual messages in a batch. When the broker sends a batch message to the client, it should be buffered on the client side, also with ZeroQueueConsumer.

It's very much possible that you have faced bugs in the ZeroQueueConsumerImpl implementation. When checking at the implementation, it doesn't seem to make a distinction between batch messages. In Pulsar code, it's bit of a mess that in some cases the permits are about batch messages and some times there are about individual messages. I have created an issue about that in #23263 . It's possible that the information in the issue isn't accurate.

you can reproduce this issue with the example I mentioned above. so, I'm wondering is it ok that consumer gets blocked indefinitely?

@lhotari
Copy link
Member

lhotari commented Aug 6, 2025

I searched for ZeroQueueConsumerImpl related issues.

Now I finally understand the motivation. It appears that ZeroQueueConsumerImpl isn't intended to be used with batch messages at all, which is a bit confusing. The issue #11850 hints that. It refers this method:

@Override
void receiveIndividualMessagesFromBatch(BrokerEntryMetadata brokerEntryMetadata, MessageMetadata msgMetadata,
int redeliveryCount, List<Long> ackSet, ByteBuf uncompressedPayload,
MessageIdData messageId, ClientCnx cnx, long consumerEpoch,
boolean isEncrypted) {
log.warn(
"Closing consumer [{}]-[{}] due to unsupported received batch-message with zero receiver queue size",
subscription, consumerName);
// close connection
closeAsync().handle((ok, e) -> {
// notify callback with failure result
notifyPendingReceivedCallback(null,
new PulsarClientException.InvalidMessageException(
format("Unsupported Batch message with 0 size receiver queue for [%s]-[%s] ",
subscription, consumerName)));
return null;
});
}

Which is the same method that you are calling.

I wasn't aware of this. I'll review the changes again.

@3pacccccc 3pacccccc requested a review from lhotari August 7, 2025 06:03
@lhotari
Copy link
Member

lhotari commented Aug 7, 2025

please add javadoc to ConsumerBuilder.messagePayloadProcessor about the behavior with receiverQueueSize=0 (don't mention ZeroQueueConsumerImpl in ConsumerBuilder javadoc since it is an implementation detail):

/**
* If configured with a non-null value, the consumer uses the processor to process the payload, including
* decoding it to messages and triggering the listener.
*
* Default: null
*/
ConsumerBuilder<T> messagePayloadProcessor(MessagePayloadProcessor payloadProcessor);

This is the receiverQueueSize javadoc:

/**
* Sets the size of the consumer receive queue.
*
* <p>The consumer receive queue controls how many messages can be accumulated by the {@link Consumer} before the
* application calls {@link Consumer#receive()}. Using a higher value can potentially increase consumer
* throughput at the expense of bigger memory utilization.
*
* <p>For the consumer that subscribes to the partitioned topic, the parameter
* {@link ConsumerBuilder#maxTotalReceiverQueueSizeAcrossPartitions} also affects
* the number of messages accumulated in the consumer.
*
* <p><b>Setting the consumer queue size as zero</b>
* <ul>
* <li>Decreases the throughput of the consumer by disabling pre-fetching of messages. This approach improves the
* message distribution on shared subscriptions by pushing messages only to the consumers that are ready to process
* them. Neither {@link Consumer#receive(int, TimeUnit)} nor Partitioned Topics can be used if the consumer queue
* size is zero. {@link Consumer#receive()} function call should not be interrupted when the consumer queue size is
* zero.</li>
* <li>Doesn't support Batch-Message. If a consumer receives a batch-message, it closes the consumer connection with
* the broker and {@link Consumer#receive()} calls remain blocked while {@link Consumer#receiveAsync()} receives
* exception in callback.
*
* <b> The consumer is not able to receive any further messages unless batch-message in pipeline
* is removed.</b></li>
* </ul>
* The default value is {@code 1000} messages and should be adequate for most use cases.
*
* @param receiverQueueSize
* the new receiver queue size value
* @return the consumer builder instance
*/
ConsumerBuilder<T> receiverQueueSize(int receiverQueueSize);

Copy link
Member

@lhotari lhotari left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@3pacccccc
Copy link
Contributor Author

@lhotari I've added javadoc, please take a look again.

@3pacccccc 3pacccccc requested a review from lhotari August 7, 2025 14:48
@lhotari
Copy link
Member

lhotari commented Aug 7, 2025

Btw. Not directly related to this PR, but could you check if there's any use for the PulsarClientException.InvalidMessageException that is thrown? Does it get used at all? I think that we have made changes to closing of the consumer, that it will fail all pending operations with PulsarClientException.AlreadyClosedException.

@codecov-commenter
Copy link

codecov-commenter commented Aug 7, 2025

Codecov Report

❌ Patch coverage is 62.50000% with 3 lines in your changes missing coverage. Please review.
✅ Project coverage is 74.37%. Comparing base (dedba1a) to head (bec30ca).
⚠️ Report is 5 commits behind head on master.

Files with missing lines Patch % Lines
...ache/pulsar/client/impl/ZeroQueueConsumerImpl.java 62.50% 2 Missing and 1 partial ⚠️
Additional details and impacted files

Impacted file tree graph

@@             Coverage Diff              @@
##             master   #24610      +/-   ##
============================================
- Coverage     74.41%   74.37%   -0.05%     
- Complexity    32808    33176     +368     
============================================
  Files          1881     1881              
  Lines        146776   146823      +47     
  Branches      16857    16862       +5     
============================================
- Hits         109226   109194      -32     
- Misses        28901    28953      +52     
- Partials       8649     8676      +27     
Flag Coverage Δ
inttests 26.66% <0.00%> (-0.22%) ⬇️
systests 23.31% <0.00%> (-0.11%) ⬇️
unittests 73.85% <62.50%> (-0.05%) ⬇️

Flags with carried forward coverage won't be shown. Click here to find out more.

Files with missing lines Coverage Δ
...va/org/apache/pulsar/client/impl/ConsumerImpl.java 80.10% <ø> (-0.41%) ⬇️
...ache/pulsar/client/impl/ZeroQueueConsumerImpl.java 76.13% <62.50%> (-0.41%) ⬇️

... and 99 files with indirect coverage changes

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.
  • 📦 JS Bundle Analysis: Save yourself from yourself by tracking and limiting bundle sizes in JS merges.

@3pacccccc
Copy link
Contributor Author

Btw. Not directly related to this PR, but could you check if there's any use for the PulsarClientException.InvalidMessageException that is thrown? Does it get used at all? I think that we have made changes to closing of the consumer, that it will fail all pending operations with PulsarClientException.AlreadyClosedException.

@lhotari OK, I'm glad to do that

@3pacccccc
Copy link
Contributor Author

3pacccccc commented Aug 9, 2025

Btw. Not directly related to this PR, but could you check if there's any use for the PulsarClientException.InvalidMessageException that is thrown? Does it get used at all? I think that we have made changes to closing of the consumer, that it will fail all pending operations with PulsarClientException.AlreadyClosedException.

Hi, lari, @lhotari
I've checked all the InvalidMessageException usages:

  • mostly on the producer side (which isn't related to our current changes)

  • few on the consumer side (mainly for message validation purposes)

The current usage looks correct and doesn't need modification. However, I did notice and fix some inaccurate javadocs - please take a look when you have time.

@Technoboy- Technoboy- added this to the 4.1.0 milestone Aug 11, 2025
@Technoboy- Technoboy- merged commit bbf56f6 into apache:master Aug 12, 2025
51 checks passed
@3pacccccc
Copy link
Contributor Author

@Technoboy- @lhotari
Hi Jiwei, Lari. I think this PR might needs to be cherry-picked to branch3.x and 4.x because:
1.The issue fixed by this PR also exists in branch3.x and 4.x
2.The javadoc behavior described in this PR is also present in branch3.x and 4.x"

gaozhangmin pushed a commit to gaozhangmin/pulsar that referenced this pull request Aug 13, 2025
lhotari pushed a commit that referenced this pull request Aug 14, 2025
…when using MessagePayloadProcessor (#24610)

(cherry picked from commit bbf56f6)
lhotari pushed a commit that referenced this pull request Aug 14, 2025
…when using MessagePayloadProcessor (#24610)

(cherry picked from commit bbf56f6)
lhotari pushed a commit that referenced this pull request Aug 14, 2025
…when using MessagePayloadProcessor (#24610)

(cherry picked from commit bbf56f6)
poorbarcode pushed a commit to poorbarcode/pulsar that referenced this pull request Aug 14, 2025
ganesh-ctds pushed a commit to datastax/pulsar that referenced this pull request Aug 20, 2025
…when using MessagePayloadProcessor (apache#24610)

(cherry picked from commit bbf56f6)
(cherry picked from commit f8ea061)
srinath-ctds pushed a commit to datastax/pulsar that referenced this pull request Aug 20, 2025
…when using MessagePayloadProcessor (apache#24610)

(cherry picked from commit bbf56f6)
(cherry picked from commit f8ea061)
manas-ctds pushed a commit to datastax/pulsar that referenced this pull request Aug 20, 2025
…when using MessagePayloadProcessor (apache#24610)

(cherry picked from commit bbf56f6)
(cherry picked from commit b26934b)
srinath-ctds pushed a commit to datastax/pulsar that referenced this pull request Aug 26, 2025
…when using MessagePayloadProcessor (apache#24610)

(cherry picked from commit bbf56f6)
(cherry picked from commit b26934b)
nodece pushed a commit to ascentstream/pulsar that referenced this pull request Aug 26, 2025
…when using MessagePayloadProcessor (apache#24610)

(cherry picked from commit bbf56f6)
Technoboy- pushed a commit to Technoboy-/pulsar that referenced this pull request Sep 10, 2025
…when using MessagePayloadProcessor (apache#24610)

(cherry picked from commit bbf56f6)
KannarFr pushed a commit to CleverCloud/pulsar that referenced this pull request Sep 22, 2025
walkinggo pushed a commit to walkinggo/pulsar that referenced this pull request Oct 8, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants