[fix][client] Fix duplicate messages caused by seek#16171
Conversation
|
/pulsarbot rerun-failure-checks |
Why didn't it fail before? |
@BewareMyPower I don't know how to explain, the master branch cannot reproduce this issue, but the #15568 can reproduce this issue. |
|
Did you mean after applying the changes of #15568 will this test fail? |
Yes, could you take a look this issue? |
|
Sure. |
|
Yes this PR fixes the race condition when org.apache.pulsar.client.api.Consumer<String> consumer = pulsarClient
.newConsumer(Schema.STRING).startMessageIdInclusive()
.topics(Arrays.asList(topicName, topicName2)).subscriptionName("my-sub").subscribe();
Thread.sleep(2000);
consumer.seek((partitionedTopic) -> {I think the root cause is that |
|
|
79a9fae to
6b8ef55
Compare
Signed-off-by: Zixuan Liu <nodeces@gmail.com>
6b8ef55 to
25abae8
Compare
…ting logic for better handle the checkpoint. (#19972) * Change the initial start cursor and stop cursor to better handle the consuming behaviors. * Create the initial subscription instead seek every time. This should fix the wrong position setting. * Fix the wrong stop cursor, make sure it stops at the correct space * Drop Consumer.seek() for apache/pulsar#16171
…ting logic for better handle the checkpoint. (apache#19972) * Change the initial start cursor and stop cursor to better handle the consuming behaviors. * Create the initial subscription instead seek every time. This should fix the wrong position setting. * Fix the wrong stop cursor, make sure it stops at the correct space * Drop Consumer.seek() for apache/pulsar#16171 (cherry picked from commit 18d21a0)
…ting logic for better handle the checkpoint. (apache#19972) * Change the initial start cursor and stop cursor to better handle the consuming behaviors. * Create the initial subscription instead seek every time. This should fix the wrong position setting. * Fix the wrong stop cursor, make sure it stops at the correct space * Drop Consumer.seek() for apache/pulsar#16171
…ting logic for better handle the checkpoint. (apache#19972) * Change the initial start cursor and stop cursor to better handle the consuming behaviors. * Create the initial subscription instead seek every time. This should fix the wrong position setting. * Fix the wrong stop cursor, make sure it stops at the correct space * Drop Consumer.seek() for apache/pulsar#16171
…ting logic for better handle the checkpoint. (apache#19972) * Change the initial start cursor and stop cursor to better handle the consuming behaviors. * Create the initial subscription instead seek every time. This should fix the wrong position setting. * Fix the wrong stop cursor, make sure it stops at the correct space * Drop Consumer.seek() for apache/pulsar#16171
…ting logic for better handle the checkpoint. (apache#19972) * Change the initial start cursor and stop cursor to better handle the consuming behaviors. * Create the initial subscription instead seek every time. This should fix the wrong position setting. * Fix the wrong stop cursor, make sure it stops at the correct space * Drop Consumer.seek() for apache/pulsar#16171
…ting logic for better handle the checkpoint. (#19972) (#20565) * Change the initial start cursor and stop cursor to better handle the consuming behaviors. * Create the initial subscription instead seek every time. This should fix the wrong position setting. * Fix the wrong stop cursor, make sure it stops at the correct space * Drop Consumer.seek() for apache/pulsar#16171
…ting logic for better handle the checkpoint. (#19972) (#20564) * Change the initial start cursor and stop cursor to better handle the consuming behaviors. * Create the initial subscription instead seek every time. This should fix the wrong position setting. * Fix the wrong stop cursor, make sure it stops at the correct space * Drop Consumer.seek() for apache/pulsar#16171
|
The pr had no activity for 30 days, mark with Stale label. |
| } | ||
| messagesFuture.thenAcceptAsync(messages -> { | ||
| if (consumer.isDuringSeek()) { | ||
| receiveMessageFromConsumer(consumer, batchReceive); |
There was a problem hiding this comment.
When incomingMessages is not empty, there are two possibilities:
- loop until there is no message in the
incomingMessages - If
ConsumerImpl.seekfails, the messages have popped fromincomingMessagescannot be consumed untilredeliverexecutes
There was a problem hiding this comment.
I also have a question about this part.
It looks like not only from the MultiTopicConsumer. If it is from the user side, using a consumer to receive the messages and another thread try to seek the subscription to another position, they will also receive the duplicated messages right?
There was a problem hiding this comment.
2. If
ConsumerImpl.seekfails, the messages have popped fromincomingMessagescannot be consumed untilredeliverexecutes
Good catch! We need to consider this.
It looks like not only from the MultiTopicConsumer.
Yes.
If it is from the user side, using a consumer to receive the messages and another thread try to seek the subscription to another position, they will also receive the duplicated messages right?
For MultiTopicConsumer, this is right, because the MultiTopicConsumer has a loop to pulling the messsage.
There was a problem hiding this comment.
Ok, I guess the fix can only fix the case that the MultiTopicConsumer poll messages from the internal consumer, but it can't fix the issue that the user facing. After the message polled from the internal consumer to the queue of the MultiTopicConsumer, the user will still have chance to get duplicated messages during the seek operation?
There was a problem hiding this comment.
Ok, I guess the fix can only fix the case that the MultiTopicConsumer poll messages from the internal consumer.
Right.
the user will still have chance to get duplicated messages during the seek operation?
I think we need to figure out the details of this seek.
- When to clean
incomingMessages, after or before seek? - During the seek, can the client continue to consume the
incomingMessages?
…ting logic for better handle the checkpoint. (apache#19972) * Change the initial start cursor and stop cursor to better handle the consuming behaviors. * Create the initial subscription instead seek every time. This should fix the wrong position setting. * Fix the wrong stop cursor, make sure it stops at the correct space * Drop Consumer.seek() for apache/pulsar#16171
…ting logic for better handle the checkpoint. (#19972) * Change the initial start cursor and stop cursor to better handle the consuming behaviors. * Create the initial subscription instead seek every time. This should fix the wrong position setting. * Fix the wrong stop cursor, make sure it stops at the correct space * Drop Consumer.seek() for apache/pulsar#16171
…ting logic for better handle the checkpoint. (#19972) * Change the initial start cursor and stop cursor to better handle the consuming behaviors. * Create the initial subscription instead seek every time. This should fix the wrong position setting. * Fix the wrong stop cursor, make sure it stops at the correct space * Drop Consumer.seek() for apache/pulsar#16171
…ting logic for better handle the checkpoint. (#19972) * Change the initial start cursor and stop cursor to better handle the consuming behaviors. * Create the initial subscription instead seek every time. This should fix the wrong position setting. * Fix the wrong stop cursor, make sure it stops at the correct space * Drop Consumer.seek() for apache/pulsar#16171
|
No reviewer, so close this PR. |
Signed-off-by: Zixuan Liu nodeces@gmail.com
Motivation
When subscribing to a message on a partitioned topic, do the seek operation and then consume the messages, which sometimes will receive duplicate messages.
The root cause is that when a seek operation is performed if have the task in the
pendingReceivesqueue, we will get old messages from before doing the seek operation.Modifications
incomingMessagesqueueVerifying this change
org.apache.pulsar.broker.service.SubscriptionSeekTest#testSeekByFunctionAndMultiTopiccover this changes.Documentation
doc-not-needed