Skip to content

Conversation

@poorbarcode
Copy link
Contributor

@poorbarcode poorbarcode commented Sep 2, 2022

Fixes

Motivation

There has a race condition between add messages to incoming queue and clean incoming queue.


When we call MultiTopicsConsumerImpl.seek, we expect the process executed like this:

  • clear incoming queue
  • each consumer executes seek
    • consumer_1 seek
    • consumer_2 seek
  • wait for all consumers seek finished
  • receive messages

But the real process might be executed like this:

  • clear incoming queue
  • (High light)receiveMessageFromConsumer is being executed at this time, many messages being added to the incoming queue
  • each consumer executes seek
    • consumer_1 seek
    • consumer_2 seek
  • (High light)receiveMessageFromConsumer is being executed at step 2, and many messages are being added to the incoming queue at this step
  • wait for all consumers seek finished
  • receive messages

then the same message will be consumed twice:

  • add message(3:3) to the incoming queue
  • seek to 3:1
  • add message[ (3:1),(3:2),(3:3) ] to the incoming queue
  • (High light) incoming queue has 4 messages [ (3:3), (3:1),(3:2),(3:3) ]

How to reproduce?

execute TopicReaderTest.testMultiReaderIsAbleToSeekWithTimeOnMiddleOfTopic 100 ~400 times


Modifications

Make seek to two parts action:

part-1:

  • seek start: call pause
  • if the status is pause, all action receiveMessageFromConsumer will not execute immediately, just into the pending queue

part-2:

  • wait all consumers seek finished, call resume
  • execute all actions in the pending queue

Documentation

  • doc-required

  • doc-not-needed

  • doc

  • doc-complete

@github-actions github-actions bot added the doc-not-needed Your PR changes do not impact docs label Sep 2, 2022
@Technoboy- Technoboy- added this to the 2.12.0 milestone Sep 5, 2022
@poorbarcode
Copy link
Contributor Author

poorbarcode commented Sep 6, 2022

This PR should merge into these branches (because this test was appended in branch-2.11):

  • branch-2.8
  • branch-2.9
  • branch-2.10
  • branch-2.11
  • master

Note: new features have been added in branch-2.8, so if we want to cherry-pick to branch-2.7, there will be conflicts.

@poorbarcode
Copy link
Contributor Author

/pulsarbot rerun-failure-checks

@codelipenghui
Copy link
Contributor

@poorbarcode Could you please provide more context about how the race condition will happen?

@codelipenghui codelipenghui modified the milestones: 2.11.0, 2.12.0 Sep 9, 2022
@codelipenghui codelipenghui added release/2.11.1 type/bug The PR fixed a bug or issue reported a bug area/client and removed type/flaky-tests labels Sep 9, 2022
@poorbarcode
Copy link
Contributor Author

Hi @codelipenghui

Already added the description. I'm sorry I didn't finish TODO in time


private CompletableFuture<Void> internalSeekAsync(Function<Consumer, CompletableFuture<Void>> childSeekFunction) {
pause();
CompletableFuture<Void> res = FutureUtil.waitForAll(receiveMessageFutures).thenCompose(__ -> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we need to wait for completion of these ongoing receives as the messages will be discarded anyway, can we just cancel them?

Copy link
Contributor Author

@poorbarcode poorbarcode Sep 13, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good suggestion, but we don't have the operability of cancel, and the waiting-event only happens on this scenario: call seek multi times quickly, along with read, this is very rare.

@codelipenghui
Copy link
Contributor

@poorbarcode Thanks. I understand the problem for now.

There is ML discussion https://lists.apache.org/thread/97o9t4ltkds5pfq41l9xbbd31t41qm8w
I think we'd better find a general solution for the message duplication

And this one https://lists.apache.org/thread/gnqwxo7w6n6g72ochvgpgv4s6r8mnwb7 is also a duplicated message
discussion.

These are not exactly the same problem, but tight related.

For this PR. we will introduce many queue operations to avoid duplicated messages.
This seems to be a bit expensive.

@poorbarcode
Copy link
Contributor Author

PIP-194 will fix this problem

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

area/client doc-not-needed Your PR changes do not impact docs release/2.8.5 release/2.9.4 release/2.10.3 release/2.11.1 type/bug The PR fixed a bug or issue reported a bug

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants