-
Notifications
You must be signed in to change notification settings - Fork 3.7k
[fix][client]Prevent ZeroQueueConsumer from receiving batch messages when using MessagePayloadProcessor #24610
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
lhotari
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The motivation isn't very clear yet.
Currently,
ZeroQueueConsumerImplwithMessagePayloadProcessorenabled can incorrectly receive batch messages when it shouldn't. This occurs because:
- The message payload processor processes batch messages before checking if the consumer is a zero-queue consumer
- This behavior contradicts the expected behavior of zero-queue consumers which should not buffer any messages
Answering these questions would help:
- Please elaborate more on why you think that this contradicts expected behavior of zero-queue consumers?
- What do you consider as "buffering"?
- What is your use case?
|
Thanks for your review @lhotari . Here's my answer to your questions:
The current behavior with MessagePayloadProcessor creates an unpredictable interaction pattern. Here's a detailed breakdown of what happens: Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName)
.batchingMaxMessages(5)
.enableBatching(true)
.create();
ConsumerImpl<byte[]> consumer = (ConsumerImpl<byte[]>) pulsarClient.newConsumer()
.topic(topicName)
.messagePayloadProcessor(new CustomProcessor())
.subscriptionName(subscriptionName)
.receiverQueueSize(0)
.subscribe();
for (int i = 0; i < totalMessages; i++) {
String message = messagePredicate + i;
producer.sendAsync(message.getBytes());
}
producer.flush();
for (int i = 0; i < 10; i++) {
Message<byte[]> receive = consumer.receive();
}In this case, here's the step of what will happen:
so, as you can see, this Key Problems:
In my understanding, the fundamental design principle of
We need ZeroQueueConsumer to: 1.Maintain Strict Pull Semantics: Each receive() should trigger exactly one broker interaction 2.Handle Batches Predictably: Either:Reject batches entirely, or Process them atomically without intermediate buffering |
I don't think that this is a correct assumption. In Pulsar, the receive method returns individual messages in a batch. It's very much possible that you have faced bugs in the ZeroQueueConsumerImpl implementation. When checking at the implementation, it doesn't seem to make a distinction between batch messages. In Pulsar code, it's bit of a mess that in some cases the permits are about batch messages and some times there are about individual messages. I have created an issue about that in #23263 . It's possible that the information in the issue isn't accurate. |
you can reproduce this issue with the example I mentioned above. so, I'm wondering is it ok that consumer gets blocked indefinitely? |
|
I searched for ZeroQueueConsumerImpl related issues. Now I finally understand the motivation. It appears that ZeroQueueConsumerImpl isn't intended to be used with batch messages at all, which is a bit confusing. The issue #11850 hints that. It refers this method: pulsar/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ZeroQueueConsumerImpl.java Lines 194 to 211 in dedba1a
Which is the same method that you are calling. I wasn't aware of this. I'll review the changes again. |
pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
Outdated
Show resolved
Hide resolved
pulsar-client/src/main/java/org/apache/pulsar/client/impl/ZeroQueueConsumerImpl.java
Outdated
Show resolved
Hide resolved
|
please add javadoc to ConsumerBuilder.messagePayloadProcessor about the behavior with receiverQueueSize=0 (don't mention ZeroQueueConsumerImpl in ConsumerBuilder javadoc since it is an implementation detail): pulsar/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ConsumerBuilder.java Lines 821 to 827 in dedba1a
This is the receiverQueueSize javadoc: pulsar/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ConsumerBuilder.java Lines 377 to 408 in dedba1a
|
lhotari
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
|
@lhotari I've added javadoc, please take a look again. |
|
Btw. Not directly related to this PR, but could you check if there's any use for the PulsarClientException.InvalidMessageException that is thrown? Does it get used at all? I think that we have made changes to closing of the consumer, that it will fail all pending operations with PulsarClientException.AlreadyClosedException. |
pulsar-client/src/main/java/org/apache/pulsar/client/impl/ZeroQueueConsumerImpl.java
Outdated
Show resolved
Hide resolved
Codecov Report❌ Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## master #24610 +/- ##
============================================
- Coverage 74.41% 74.37% -0.05%
- Complexity 32808 33176 +368
============================================
Files 1881 1881
Lines 146776 146823 +47
Branches 16857 16862 +5
============================================
- Hits 109226 109194 -32
- Misses 28901 28953 +52
- Partials 8649 8676 +27
Flags with carried forward coverage won't be shown. Click here to find out more.
🚀 New features to boost your workflow:
|
@lhotari OK, I'm glad to do that |
Hi, lari, @lhotari
The current usage looks correct and doesn't need modification. However, I did notice and fix some inaccurate javadocs - please take a look when you have time. |
|
@Technoboy- @lhotari |
…when using MessagePayloadProcessor (apache#24610)
…when using MessagePayloadProcessor (apache#24610)
…when using MessagePayloadProcessor (apache#24610) (cherry picked from commit bbf56f6) (cherry picked from commit f8ea061)
…when using MessagePayloadProcessor (apache#24610) (cherry picked from commit bbf56f6) (cherry picked from commit f8ea061)
…when using MessagePayloadProcessor (apache#24610) (cherry picked from commit bbf56f6) (cherry picked from commit b26934b)
…when using MessagePayloadProcessor (apache#24610) (cherry picked from commit bbf56f6) (cherry picked from commit b26934b)
…when using MessagePayloadProcessor (apache#24610) (cherry picked from commit bbf56f6)
…when using MessagePayloadProcessor (apache#24610) (cherry picked from commit bbf56f6)
…when using MessagePayloadProcessor (apache#24610)
…when using MessagePayloadProcessor (apache#24610)
Motivation
Currently,
ZeroQueueConsumerImplwithMessagePayloadProcessorenabled can incorrectly receive batch messages when it shouldn't. This occurs because:The issue can be reproduced when:
MessagePayloadProcessorthat processes batch messagesModifications
processPayloadByProcessorto detect zero-queue consumers receiving batch messagesreceiveIndividualMessagesFromBatchtestZeroQueueSizeConsumerWithPayloadProcessorReceiveBatchMessageto verify the fixVerifying this change
(Please pick either of the following options)
This change is a trivial rework / code cleanup without any test coverage.
(or)
This change is already covered by existing tests, such as (please describe tests).
(or)
This change added tests and can be verified as follows:
(example:)
Does this pull request potentially affect one of the following parts:
If the box was checked, please highlight the changes
Documentation
docdoc-requireddoc-not-neededdoc-completeMatching PR in forked repository
PR in forked repository: 3pacccccc#20