-
Notifications
You must be signed in to change notification settings - Fork 3.7k
[fix][broker] Fix consume stuck of shared streaming dispatcher #18315
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
|
@poorbarcode PTAL |
poorbarcode
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
#18314 is designed to solve the problem of multiple set sendInProgress true caused by normal type reads.
Instead isLast, we should use another identifier to indicate that this entry is the last entry to be read:
- E.g 1: the last entry in queue
issuedReads - E.g 2: the previous entry of
pendingReads.peek
Both of these implementations should be somewhat difficult because the two queues are constantly changing
| dispatchMessagesThread.execute(safeRun(() -> { | ||
| if (sendMessagesToConsumers(readType, Lists.newArrayList(entry), ctx.isLast())) { | ||
| if (sendMessagesToConsumers(readType, Lists.newArrayList(entry), | ||
| readType == ReadType.Normal || ctx.isLast())) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Only normal-type reads will run here, so readType == ReadType.Normal will always be true.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You are right, I try to separate the sendInProgress and sendInProgressReplay, make sendInProgress of normal type read don't affect sendInProgressReplay of replay type read. Please look at it again.
Codecov Report
@@ Coverage Diff @@
## master #18315 +/- ##
============================================
- Coverage 46.98% 46.16% -0.83%
+ Complexity 10343 9936 -407
============================================
Files 692 667 -25
Lines 67766 65699 -2067
Branches 7259 7029 -230
============================================
- Hits 31842 30332 -1510
+ Misses 32344 31946 -398
+ Partials 3580 3421 -159
Flags with carried forward coverage won't be shown. Click here to find out more.
|
| protected boolean shouldRewindBeforeReadingOrReplaying = false; | ||
| protected final String name; | ||
| protected boolean sendInProgress; | ||
| protected boolean sendInProgressReplay; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe we can combine the two variables into a counter? Just like this:
AtomicInteger sendingTaskCounter;
sendingTaskCounter.increment();
try {
trySendMessagesToConsumer();
} finally {
int runningTaskCount = sendingTaskCounter.decrementAndGet();
if (runningTaskCount == 0){
readMoreEntries();
}
}I think this makes the logic easier to unstand.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good idea, but seems to need to remove sendInProgress=false in the PersistentStickyKeyDispatcherMultipleConsumers, I use delay execute readMoreEntries to replace it.
.../java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
Show resolved
Hide resolved
.../java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
Outdated
Show resolved
Hide resolved
| // stuckConsumers for avoid stopping dispatch. | ||
| sendInProgress = false; | ||
| topic.getBrokerService().executor().execute(safeRun(this::readMoreEntries)); | ||
| topic.getBrokerService().executor().schedule(safeRun(this::readMoreEntries), 100, TimeUnit.MILLISECONDS); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why the 100ms delay?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
because sendingTaskCounter will be decrement after the execution trySendMessagesToConsumers, I think it can replace sendInProgress = false in the line-325
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
readMoreEntries is triggered in the method sendMessagesToConsumers, can we remove these two lines?
topic.getBrokerService().executor().schedule(safeRun(this::readMoreEntries), 100, TimeUnit.MILLISECONDS);There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It seems that it doesn't need to be delayed execution, the trySendMessagesToConsumers is also within the synchronized block. I want to can be directly removed sendInProgress = false.
.../java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
Outdated
Show resolved
Hide resolved
|
@codelipenghui @congbobo184 @Technoboy- @mattisonchao Could you take a look? |
| // stuckConsumers for avoid stopping dispatch. | ||
| sendInProgress = false; | ||
| topic.getBrokerService().executor().execute(safeRun(this::readMoreEntries)); | ||
| topic.getBrokerService().executor().schedule(safeRun(this::readMoreEntries), 100, TimeUnit.MILLISECONDS); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
readMoreEntries is triggered in the method sendMessagesToConsumers, can we remove these two lines?
topic.getBrokerService().executor().schedule(safeRun(this::readMoreEntries), 100, TimeUnit.MILLISECONDS);7c39d19 to
2a05e08
Compare
…reamingDispatcherMultipleConsumers
| } | ||
|
|
||
| protected final synchronized void sendMessagesToConsumers(ReadType readType, List<Entry> entries) { | ||
| java.util.function.Consumer<Boolean> consumer = asyncRead -> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is it possible to avoid creating a new java.util.function.Consumer for each consumer's message dispatching?
| sendMessagesToConsumers(readType, entries); | ||
| } | ||
|
|
||
| protected synchronized void sendInProgressAcquire() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit:
| protected synchronized void sendInProgressAcquire() { | |
| protected synchronized void acquireSendInProgress() { |
| sendInProgress = true; | ||
| } | ||
|
|
||
| protected synchronized void sendInProgressRelease() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit:
| protected synchronized void sendInProgressRelease() { | |
| protected synchronized void releaseSendInProgress() { |
| sendInProgress = false; | ||
| } | ||
|
|
||
| protected synchronized boolean sendInProgress() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit:
| protected synchronized boolean sendInProgress() { | |
| protected synchronized boolean isSendInProgress() { |
# Conflicts: # pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java # pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStreamingDispatcherMultipleConsumers.java
| // readMoreEntries should run regardless whether or not stuck is caused by | ||
| // stuckConsumers for avoid stopping dispatch. | ||
| sendInProgress = false; | ||
| topic.getBrokerService().executor().execute(safeRun(this::readMoreEntries)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we remove this tow line(line-325, line-327) and make this method return true at these two logic branch? Then readMoreEntries will be triggered by super.readEntriesComplete.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You are right.
poorbarcode
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
|
/pulsarbot run-failure-checks |
|
@codelipenghui PTAL |
Fixes #18314
Motivation
In #18289, we set
sendInProgressto false only after the last entry in a batch of messages has been delivered, but this will make consume stuck, becauseStreamingEntryReader.pendingReadsmay is not empty when without available entries from the ledger and lead toctx.isLast()always be false.pulsar/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/streamingdispatch/StreamingEntryReader.java
Lines 172 to 177 in 82237d3
Modifications
acquireSendInProgress()/releaseSendInProgress()/isSendInProgress()sendingTaskCounterinstead ofsendInProgressto avoidsendInProgressto be set to true and false multiple times.Documentation
docdoc-requireddoc-not-neededdoc-completeMatching PR in forked repository
PR in forked repository: coderzc#26