Skip to content

Conversation

@coderzc
Copy link
Member

@coderzc coderzc commented Nov 3, 2022

Fixes #18314

Motivation

In #18289, we set sendInProgress to false only after the last entry in a batch of messages has been delivered, but this will make consume stuck, because StreamingEntryReader.pendingReads may is not empty when without available entries from the ledger and lead to ctx.isLast() always be false.

// All request has been completed, mark returned entry as last.
if (issuedReads.isEmpty() && pendingReads.isEmpty()) {
firstPendingReadEntryRequest.isLast = true;
STATE_UPDATER.set(this, State.Completed);
}
dispatcher.readEntryComplete(readEntry, firstPendingReadEntryRequest);

Modifications

  • Abstract acquireSendInProgress()/releaseSendInProgress()/isSendInProgress()
  • In subclass PersistentStreamingDispatcherMultipleConsumers, use sendingTaskCounter instead of sendInProgress to avoid sendInProgress to be set to true and false multiple times.
  • Clean up useless code

Documentation

  • doc
  • doc-required
  • doc-not-needed
  • doc-complete

Matching PR in forked repository

PR in forked repository: coderzc#26

@github-actions github-actions bot added the doc-not-needed Your PR changes do not impact docs label Nov 3, 2022
@codelipenghui codelipenghui added this to the 2.12.0 milestone Nov 3, 2022
@coderzc
Copy link
Member Author

coderzc commented Nov 4, 2022

@poorbarcode PTAL

Copy link
Contributor

@poorbarcode poorbarcode left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

#18314 is designed to solve the problem of multiple set sendInProgress true caused by normal type reads.

Instead isLast, we should use another identifier to indicate that this entry is the last entry to be read:

  • E.g 1: the last entry in queue issuedReads
  • E.g 2: the previous entry of pendingReads.peek

Both of these implementations should be somewhat difficult because the two queues are constantly changing

dispatchMessagesThread.execute(safeRun(() -> {
if (sendMessagesToConsumers(readType, Lists.newArrayList(entry), ctx.isLast())) {
if (sendMessagesToConsumers(readType, Lists.newArrayList(entry),
readType == ReadType.Normal || ctx.isLast())) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Only normal-type reads will run here, so readType == ReadType.Normal will always be true.

Copy link
Member Author

@coderzc coderzc Nov 7, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You are right, I try to separate the sendInProgress and sendInProgressReplay, make sendInProgress of normal type read don't affect sendInProgressReplay of replay type read. Please look at it again.

@codecov-commenter
Copy link

codecov-commenter commented Nov 7, 2022

Codecov Report

Merging #18315 (5045881) into master (b31c5a6) will decrease coverage by 0.82%.
The diff coverage is 40.44%.

Impacted file tree graph

@@             Coverage Diff              @@
##             master   #18315      +/-   ##
============================================
- Coverage     46.98%   46.16%   -0.83%     
+ Complexity    10343     9936     -407     
============================================
  Files           692      667      -25     
  Lines         67766    65699    -2067     
  Branches       7259     7029     -230     
============================================
- Hits          31842    30332    -1510     
+ Misses        32344    31946     -398     
+ Partials       3580     3421     -159     
Flag Coverage Δ
unittests 46.16% <40.44%> (-0.83%) ⬇️

Flags with carried forward coverage won't be shown. Click here to find out more.

Impacted Files Coverage Δ
.../org/apache/bookkeeper/mledger/impl/EntryImpl.java 69.62% <0.00%> (-11.27%) ⬇️
...lsar/broker/service/RedeliveryTrackerDisabled.java 50.00% <ø> (ø)
...va/org/apache/pulsar/broker/service/ServerCnx.java 48.91% <ø> (+0.22%) ⬆️
...ersistentStreamingDispatcherMultipleConsumers.java 0.00% <0.00%> (ø)
.../java/org/apache/pulsar/client/impl/ClientCnx.java 30.16% <ø> (ø)
...a/org/apache/pulsar/client/impl/TableViewImpl.java 0.00% <0.00%> (ø)
...ar/client/impl/conf/ProducerConfigurationData.java 84.70% <ø> (-0.18%) ⬇️
...va/org/apache/pulsar/client/impl/ConsumerImpl.java 15.07% <12.50%> (+0.03%) ⬆️
...keeper/mledger/impl/cache/RangeEntryCacheImpl.java 45.79% <22.44%> (-6.31%) ⬇️
...eeper/mledger/impl/cache/InflightReadsLimiter.java 30.35% <30.35%> (ø)
... and 93 more

protected boolean shouldRewindBeforeReadingOrReplaying = false;
protected final String name;
protected boolean sendInProgress;
protected boolean sendInProgressReplay;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we can combine the two variables into a counter? Just like this:

AtomicInteger sendingTaskCounter;

sendingTaskCounter.increment();
try {
  trySendMessagesToConsumer();
} finally {
  int runningTaskCount = sendingTaskCounter.decrementAndGet();
  if (runningTaskCount == 0){
    readMoreEntries();
  }
}

I think this makes the logic easier to unstand.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good idea, but seems to need to remove sendInProgress=false in the PersistentStickyKeyDispatcherMultipleConsumers, I use delay execute readMoreEntries to replace it.

// stuckConsumers for avoid stopping dispatch.
sendInProgress = false;
topic.getBrokerService().executor().execute(safeRun(this::readMoreEntries));
topic.getBrokerService().executor().schedule(safeRun(this::readMoreEntries), 100, TimeUnit.MILLISECONDS);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why the 100ms delay?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

because sendingTaskCounter will be decrement after the execution trySendMessagesToConsumers, I think it can replace sendInProgress = false in the line-325

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

readMoreEntries is triggered in the method sendMessagesToConsumers, can we remove these two lines?

topic.getBrokerService().executor().schedule(safeRun(this::readMoreEntries), 100, TimeUnit.MILLISECONDS);

Copy link
Member Author

@coderzc coderzc Nov 9, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems that it doesn't need to be delayed execution, the trySendMessagesToConsumers is also within the synchronized block. I want to can be directly removed sendInProgress = false.

@poorbarcode
Copy link
Contributor

@codelipenghui @congbobo184 @Technoboy- @mattisonchao Could you take a look?

// stuckConsumers for avoid stopping dispatch.
sendInProgress = false;
topic.getBrokerService().executor().execute(safeRun(this::readMoreEntries));
topic.getBrokerService().executor().schedule(safeRun(this::readMoreEntries), 100, TimeUnit.MILLISECONDS);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

readMoreEntries is triggered in the method sendMessagesToConsumers, can we remove these two lines?

topic.getBrokerService().executor().schedule(safeRun(this::readMoreEntries), 100, TimeUnit.MILLISECONDS);

@coderzc
Copy link
Member Author

coderzc commented Nov 11, 2022

@poorbarcode @codelipenghui PTAL

}

protected final synchronized void sendMessagesToConsumers(ReadType readType, List<Entry> entries) {
java.util.function.Consumer<Boolean> consumer = asyncRead -> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it possible to avoid creating a new java.util.function.Consumer for each consumer's message dispatching?

sendMessagesToConsumers(readType, entries);
}

protected synchronized void sendInProgressAcquire() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit:

Suggested change
protected synchronized void sendInProgressAcquire() {
protected synchronized void acquireSendInProgress() {

sendInProgress = true;
}

protected synchronized void sendInProgressRelease() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit:

Suggested change
protected synchronized void sendInProgressRelease() {
protected synchronized void releaseSendInProgress() {

sendInProgress = false;
}

protected synchronized boolean sendInProgress() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit:

Suggested change
protected synchronized boolean sendInProgress() {
protected synchronized boolean isSendInProgress() {

# Conflicts:
#	pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
#	pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStreamingDispatcherMultipleConsumers.java
// readMoreEntries should run regardless whether or not stuck is caused by
// stuckConsumers for avoid stopping dispatch.
sendInProgress = false;
topic.getBrokerService().executor().execute(safeRun(this::readMoreEntries));
Copy link
Contributor

@poorbarcode poorbarcode Nov 14, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we remove this tow line(line-325, line-327) and make this method return true at these two logic branch? Then readMoreEntries will be triggered by super.readEntriesComplete.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You are right.

Copy link
Contributor

@poorbarcode poorbarcode left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@coderzc
Copy link
Member Author

coderzc commented Nov 14, 2022

/pulsarbot run-failure-checks

@coderzc
Copy link
Member Author

coderzc commented Nov 15, 2022

@codelipenghui PTAL

@codelipenghui codelipenghui merged commit 55863de into apache:master Nov 18, 2022
@Technoboy- Technoboy- changed the title [fix][broker] fix consume stuck of shared streaming dispatcher [fix][broker] Fix consume stuck of shared streaming dispatcher Jul 18, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Flaky-test: PersistentTopicStreamingDispatcherE2ETest.testMessageRedelivery

6 participants