Skip to content

[fix][broker] Reader stuck after call hasMessageAvailable when enable replicateSubscriptionState#22572

Merged
codelipenghui merged 3 commits into
apache:masterfrom
shibd:geo_replication
Apr 28, 2024
Merged

[fix][broker] Reader stuck after call hasMessageAvailable when enable replicateSubscriptionState#22572
codelipenghui merged 3 commits into
apache:masterfrom
shibd:geo_replication

Conversation

@shibd

@shibd shibd commented Apr 24, 2024

Copy link
Copy Markdown
Member

Motivation

#22571

Analysis

When enabling replicateSubscriptionState will use topic to sync subscription state, and make these message metadata as Marker

These marker messages will not be sent to the consumer by the topic, and will automatically ack them.

if (Markers.isReplicatedSubscriptionSnapshotMarker(msgMetadata)) {
final int readerIndex = metadataAndPayload.readerIndex();
processReplicatedSubscriptionSnapshot(pos, metadataAndPayload);
metadataAndPayload.readerIndex(readerIndex);
}
// Deliver marker to __compaction cursor to avoid compaction task stuck,
// and filter out them when doing topic compaction.
if (msgMetadata == null || cursor == null
|| !cursor.getName().equals(Compactor.COMPACTION_SUBSCRIPTION)) {
entries.set(i, null);
entry.release();
individualAcknowledgeMessageIfNeeded(Collections.singletonList(pos),
Collections.emptyMap());
continue;
}

But getLastMessageId will always return the last message position, regardless of whether the last message is marked or not. This will cause the reader stuck.

        while (reader.hasMessageAvailable()) {  // get true
              Message message reader.readNext();  // never can't receive msg.
        }

You can refer to this diagram to help understand this bug:

image

Modifications

  • Add asyncReverseFindPositionOneByOne method on ManagedLedger .
  • Add getLastCanDispatchPosition method on Topic, it will call asyncReverseFindPositionOneByOne to find the last position of entry that not is replistateSubscriptionState
  • Change the getLastMessageId implement of ServerCnx to use getLastCanDispatchPosition instead of getMaxReadPosition.

Verifying this change

  • Add ManagedLedgerTest.testReverseFindPositionOneByOne to cover ReverseFindPositionOneByOne method.
  • Add testReplicatedSubscriptionAcrossTwoRegionsGetLastMessage to cover this bug.

Documentation

  • doc
  • doc-required
  • doc-not-needed
  • doc-complete

Matching PR in forked repository

@shibd shibd self-assigned this Apr 24, 2024
@shibd shibd added type/bug The PR fixed a bug or issue reported a bug ready-to-test labels Apr 24, 2024
@github-actions github-actions Bot added the doc-not-needed Your PR changes do not impact docs label Apr 24, 2024

@dao-jun dao-jun left a comment

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Overall looks good to me, just left some minor comments about the code style

@shibd shibd requested a review from lhotari April 25, 2024 07:22
@shibd shibd force-pushed the geo_replication branch from 6431d8c to c396054 Compare April 26, 2024 00:05
Comment thread managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedger.java Outdated
Comment thread pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Topic.java Outdated
@shibd shibd force-pushed the geo_replication branch from c396054 to 99032e8 Compare April 28, 2024 05:13
@shibd shibd requested a review from coderzc April 28, 2024 07:42

@coderzc coderzc left a comment

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@codecov-commenter

Copy link
Copy Markdown

Codecov Report

Attention: Patch coverage is 80.00000% with 9 lines in your changes are missing coverage. Please review.

Project coverage is 74.13%. Comparing base (bbc6224) to head (f094e04).
Report is 203 commits behind head on master.

Additional details and impacted files

Impacted file tree graph

@@             Coverage Diff              @@
##             master   #22572      +/-   ##
============================================
+ Coverage     73.57%   74.13%   +0.56%     
+ Complexity    32624     2747   -29877     
============================================
  Files          1877     1886       +9     
  Lines        139502   140653    +1151     
  Branches      15299    15462     +163     
============================================
+ Hits         102638   104278    +1640     
+ Misses        28908    28331     -577     
- Partials       7956     8044      +88     
Flag Coverage Δ
inttests 27.23% <57.77%> (+2.64%) ⬆️
systests 24.48% <57.77%> (+0.15%) ⬆️
unittests 73.44% <80.00%> (+0.59%) ⬆️

Flags with carried forward coverage won't be shown. Click here to find out more.

Files Coverage Δ
...sar/broker/service/persistent/PersistentTopic.java 78.66% <100.00%> (+0.20%) ⬆️
...va/org/apache/pulsar/broker/service/ServerCnx.java 72.47% <93.33%> (+0.33%) ⬆️
...n/java/org/apache/pulsar/broker/service/Topic.java 34.78% <0.00%> (-1.59%) ⬇️
...ookkeeper/mledger/util/ManagedLedgerImplUtils.java 69.56% <69.56%> (ø)

... and 278 files with indirect coverage changes

@codelipenghui codelipenghui merged commit a761b97 into apache:master Apr 28, 2024
codelipenghui pushed a commit to codelipenghui/incubator-pulsar that referenced this pull request Apr 28, 2024
… replicateSubscriptionState (apache#22572)

(cherry picked from commit a761b97)
codelipenghui pushed a commit to codelipenghui/incubator-pulsar that referenced this pull request Apr 28, 2024
… replicateSubscriptionState (apache#22572)

(cherry picked from commit a761b97)
codelipenghui pushed a commit that referenced this pull request Apr 28, 2024
… replicateSubscriptionState (#22572)

(cherry picked from commit a761b97)
@dao-jun

dao-jun commented Apr 28, 2024

Copy link
Copy Markdown
Member

The PR handled the case of ServerOnlyMarker, but it looks we also need to handle txn aborted messages.
I created a PR for improvement, PTAL
#22610

nikhil-ctds pushed a commit to datastax/pulsar that referenced this pull request May 13, 2024
… replicateSubscriptionState (apache#22572)

(cherry picked from commit a761b97)
(cherry picked from commit 1dacca5)
srinath-ctds pushed a commit to datastax/pulsar that referenced this pull request May 16, 2024
… replicateSubscriptionState (apache#22572)

(cherry picked from commit a761b97)
(cherry picked from commit 1dacca5)
hanmz pushed a commit to hanmz/pulsar that referenced this pull request Feb 12, 2025
Demogorgon314 pushed a commit to Demogorgon314/kop that referenced this pull request Apr 15, 2026
…osition implementation for compaction (streamnative#606)

### Motivation

apache/pulsar#22891 introduces some changes for
the PositionImpl. Now we need to use the Position interface instead of
the PositionImpl.

### Modifications

- Refactor to use Position instead of PositionImpl
- apache/pulsar#22572 uses
`getLastDispatchablePosition` instead of `getMaxPosition` to return the
last message id, this patch implements the new method for
`KopPersistentTopic`.
- apache/pulsar#22838 changed the result for
`checkTopicExists`. This PR also adapts to changes.
- apache/pulsar#22882 removed the method
`org.apache.pulsar.broker.service.Producer.updateRates(int
numOfMessages, long msgSizeInBytes)`. We need to use
`producer.getStats().recordMsgIn` instead.

---------

Co-authored-by: Yunze Xu <xyzinfernity@163.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

9 participants