Skip to content

[fix][client] Fix duplicate messages caused by seek#16171

Closed
nodece wants to merge 1 commit into
apache:masterfrom
nodece:fix-duplicate-messages-seek
Closed

[fix][client] Fix duplicate messages caused by seek#16171
nodece wants to merge 1 commit into
apache:masterfrom
nodece:fix-duplicate-messages-seek

Conversation

@nodece

@nodece nodece commented Jun 22, 2022

Copy link
Copy Markdown
Member

Signed-off-by: Zixuan Liu nodeces@gmail.com

Motivation

When subscribing to a message on a partitioned topic, do the seek operation and then consume the messages, which sometimes will receive duplicate messages.

The root cause is that when a seek operation is performed if have the task in the pendingReceives queue, we will get old messages from before doing the seek operation.

Modifications

  • Add a seek check to the logic of the received message, when the seek operation is in progress, skip put the message to incomingMessages queue

Verifying this change

org.apache.pulsar.broker.service.SubscriptionSeekTest#testSeekByFunctionAndMultiTopic cover this changes.

Documentation

  • doc-not-needed

@github-actions github-actions Bot added the doc-not-needed Your PR changes do not impact docs label Jun 22, 2022
@nodece

nodece commented Jun 23, 2022

Copy link
Copy Markdown
Member Author

/pulsarbot rerun-failure-checks

@BewareMyPower

Copy link
Copy Markdown
Contributor

org.apache.pulsar.broker.service.SubscriptionSeekTest#testSeekByFunctionAndMultiTopic cover this changes.

Why didn't it fail before?

@nodece

nodece commented Jun 23, 2022

Copy link
Copy Markdown
Member Author

org.apache.pulsar.broker.service.SubscriptionSeekTest#testSeekByFunctionAndMultiTopic cover this changes.

Why didn't it fail before?

@BewareMyPower I don't know how to explain, the master branch cannot reproduce this issue, but the #15568 can reproduce this issue.

@BewareMyPower

Copy link
Copy Markdown
Contributor

Did you mean after applying the changes of #15568 will this test fail?

@nodece

nodece commented Jun 23, 2022

Copy link
Copy Markdown
Member Author

Did you mean after applying the changes of #15568 will this test fail?

Yes, could you take a look this issue?

@BewareMyPower

Copy link
Copy Markdown
Contributor

Sure.

@codelipenghui codelipenghui added this to the 2.11.0 milestone Jun 28, 2022
@BewareMyPower

Copy link
Copy Markdown
Contributor

Yes this PR fixes the race condition when seek is called after subscribe, but I think there is another bug that subscribe doesn't looks like a synchronous operation. Without this PR, the test could still pass after sleeping for a while before seek.

        org.apache.pulsar.client.api.Consumer<String> consumer = pulsarClient
                .newConsumer(Schema.STRING).startMessageIdInclusive()
                .topics(Arrays.asList(topicName, topicName2)).subscriptionName("my-sub").subscribe();

        Thread.sleep(2000);

        consumer.seek((partitionedTopic) -> {

I think the root cause is that MultiTopicsConsumerImpl#subscribeFuture() completed too early.

@nodece

nodece commented Jun 28, 2022

Copy link
Copy Markdown
Member Author

subscribe is an async task, when you sleep 2s, it means we have received all messages from the broker. If you remove this sleep, there are some messages coming to the client from the broker.

Signed-off-by: Zixuan Liu <nodeces@gmail.com>
@nodece nodece force-pushed the fix-duplicate-messages-seek branch from 6b8ef55 to 25abae8 Compare July 12, 2022 15:25
@nodece nodece requested review from RobertIndie and Technoboy- July 14, 2022 09:05
@codelipenghui codelipenghui modified the milestones: 2.11.0, 2.12.0 Jul 26, 2022
syhily added a commit to streamnative/flink that referenced this pull request Aug 11, 2022
syhily added a commit to streamnative/flink that referenced this pull request Aug 12, 2022
syhily added a commit to streamnative/flink that referenced this pull request Aug 12, 2022
syhily added a commit to streamnative/flink that referenced this pull request Aug 12, 2022
tisonkun pushed a commit to apache/flink that referenced this pull request Aug 12, 2022
…ting logic for better handle the checkpoint. (#19972)

* Change the initial start cursor and stop cursor to better handle the consuming behaviors.
* Create the initial subscription instead seek every time. This should fix the wrong position setting.
* Fix the wrong stop cursor, make sure it stops at the correct space
* Drop Consumer.seek() for apache/pulsar#16171
syhily added a commit to streamnative/flink that referenced this pull request Aug 13, 2022
…ting logic for better handle the checkpoint. (apache#19972)

* Change the initial start cursor and stop cursor to better handle the consuming behaviors.
* Create the initial subscription instead seek every time. This should fix the wrong position setting.
* Fix the wrong stop cursor, make sure it stops at the correct space
* Drop Consumer.seek() for apache/pulsar#16171

(cherry picked from commit 18d21a0)
syhily added a commit to streamnative/flink that referenced this pull request Aug 13, 2022
…ting logic for better handle the checkpoint. (apache#19972)

* Change the initial start cursor and stop cursor to better handle the consuming behaviors.
* Create the initial subscription instead seek every time. This should fix the wrong position setting.
* Fix the wrong stop cursor, make sure it stops at the correct space
* Drop Consumer.seek() for apache/pulsar#16171
syhily added a commit to streamnative/flink that referenced this pull request Aug 13, 2022
…ting logic for better handle the checkpoint. (apache#19972)

* Change the initial start cursor and stop cursor to better handle the consuming behaviors.
* Create the initial subscription instead seek every time. This should fix the wrong position setting.
* Fix the wrong stop cursor, make sure it stops at the correct space
* Drop Consumer.seek() for apache/pulsar#16171
syhily added a commit to streamnative/flink that referenced this pull request Aug 13, 2022
…ting logic for better handle the checkpoint. (apache#19972)

* Change the initial start cursor and stop cursor to better handle the consuming behaviors.
* Create the initial subscription instead seek every time. This should fix the wrong position setting.
* Fix the wrong stop cursor, make sure it stops at the correct space
* Drop Consumer.seek() for apache/pulsar#16171
syhily added a commit to streamnative/flink that referenced this pull request Aug 14, 2022
…ting logic for better handle the checkpoint. (apache#19972)

* Change the initial start cursor and stop cursor to better handle the consuming behaviors.
* Create the initial subscription instead seek every time. This should fix the wrong position setting.
* Fix the wrong stop cursor, make sure it stops at the correct space
* Drop Consumer.seek() for apache/pulsar#16171
tisonkun pushed a commit to apache/flink that referenced this pull request Aug 14, 2022
…ting logic for better handle the checkpoint. (#19972) (#20565)

* Change the initial start cursor and stop cursor to better handle the consuming behaviors.
* Create the initial subscription instead seek every time. This should fix the wrong position setting.
* Fix the wrong stop cursor, make sure it stops at the correct space
* Drop Consumer.seek() for apache/pulsar#16171
tisonkun pushed a commit to apache/flink that referenced this pull request Aug 14, 2022
…ting logic for better handle the checkpoint. (#19972) (#20564)

* Change the initial start cursor and stop cursor to better handle the consuming behaviors.
* Create the initial subscription instead seek every time. This should fix the wrong position setting.
* Fix the wrong stop cursor, make sure it stops at the correct space
* Drop Consumer.seek() for apache/pulsar#16171
@github-actions

Copy link
Copy Markdown

The pr had no activity for 30 days, mark with Stale label.

@github-actions github-actions Bot added the Stale label Aug 25, 2022
}
messagesFuture.thenAcceptAsync(messages -> {
if (consumer.isDuringSeek()) {
receiveMessageFromConsumer(consumer, batchReceive);

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When incomingMessages is not empty, there are two possibilities:

  1. loop until there is no message in the incomingMessages
  2. If ConsumerImpl.seek fails, the messages have popped from incomingMessages cannot be consumed until redeliver executes

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I also have a question about this part.

It looks like not only from the MultiTopicConsumer. If it is from the user side, using a consumer to receive the messages and another thread try to seek the subscription to another position, they will also receive the duplicated messages right?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

2. If ConsumerImpl.seek fails, the messages have popped from incomingMessages cannot be consumed until redeliver executes

Good catch! We need to consider this.

It looks like not only from the MultiTopicConsumer.

Yes.

If it is from the user side, using a consumer to receive the messages and another thread try to seek the subscription to another position, they will also receive the duplicated messages right?

For MultiTopicConsumer, this is right, because the MultiTopicConsumer has a loop to pulling the messsage.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, I guess the fix can only fix the case that the MultiTopicConsumer poll messages from the internal consumer, but it can't fix the issue that the user facing. After the message polled from the internal consumer to the queue of the MultiTopicConsumer, the user will still have chance to get duplicated messages during the seek operation?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, I guess the fix can only fix the case that the MultiTopicConsumer poll messages from the internal consumer.

Right.

the user will still have chance to get duplicated messages during the seek operation?

I think we need to figure out the details of this seek.

  1. When to clean incomingMessages, after or before seek?
  2. During the seek, can the client continue to consume the incomingMessages?

huangxiaofeng10047 pushed a commit to huangxiaofeng10047/flink that referenced this pull request Nov 3, 2022
…ting logic for better handle the checkpoint. (apache#19972)

* Change the initial start cursor and stop cursor to better handle the consuming behaviors.
* Create the initial subscription instead seek every time. This should fix the wrong position setting.
* Fix the wrong stop cursor, make sure it stops at the correct space
* Drop Consumer.seek() for apache/pulsar#16171
MartijnVisser pushed a commit to MartijnVisser/flink-connector-pulsar that referenced this pull request Nov 28, 2022
…ting logic for better handle the checkpoint. (#19972)

* Change the initial start cursor and stop cursor to better handle the consuming behaviors.
* Create the initial subscription instead seek every time. This should fix the wrong position setting.
* Fix the wrong stop cursor, make sure it stops at the correct space
* Drop Consumer.seek() for apache/pulsar#16171
MartijnVisser pushed a commit to MartijnVisser/flink-connector-pulsar that referenced this pull request Nov 29, 2022
…ting logic for better handle the checkpoint. (#19972)

* Change the initial start cursor and stop cursor to better handle the consuming behaviors.
* Create the initial subscription instead seek every time. This should fix the wrong position setting.
* Fix the wrong stop cursor, make sure it stops at the correct space
* Drop Consumer.seek() for apache/pulsar#16171
MartijnVisser pushed a commit to apache/flink-connector-pulsar that referenced this pull request Nov 29, 2022
…ting logic for better handle the checkpoint. (#19972)

* Change the initial start cursor and stop cursor to better handle the consuming behaviors.
* Create the initial subscription instead seek every time. This should fix the wrong position setting.
* Fix the wrong stop cursor, make sure it stops at the correct space
* Drop Consumer.seek() for apache/pulsar#16171
@nodece

nodece commented Mar 25, 2023

Copy link
Copy Markdown
Member Author

No reviewer, so close this PR.

@nodece nodece closed this Mar 25, 2023
@hicolour

hicolour commented May 2, 2023

Copy link
Copy Markdown

Hey @nodece, why this PR was closed? Does it it mean that this PR and #15568 will not be delivered to the maistream ?

@nodece

nodece commented May 3, 2023

Copy link
Copy Markdown
Member Author

Hey @nodece, why this PR was closed?

Currently, there is no reviewer pushing this PR, so I decided to close this PR. If anyone is willing to push, I will reopen.

Does it it mean that this PR and #15568 will not be delivered to the maistream ?

Right!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

doc-not-needed Your PR changes do not impact docs Stale

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants