Skip to content

[fix] Fix Reader can be stuck from transaction aborted messages.#22610

Merged
dao-jun merged 9 commits into
apache:masterfrom
dao-jun:improve/filter-aborted-txns
May 7, 2024
Merged

[fix] Fix Reader can be stuck from transaction aborted messages.#22610
dao-jun merged 9 commits into
apache:masterfrom
dao-jun:improve/filter-aborted-txns

Conversation

@dao-jun

@dao-jun dao-jun commented Apr 28, 2024

Copy link
Copy Markdown
Member

Motivation

Fix Reader can be stuck from transaction aborted messages.
Related to #22572

Since Reader's SubscriptionType is Exclusive, so we no need to handle DelayedDelivery messages.

Modifications

Verifying this change

  • Make sure that the change passes the CI checks.

(Please pick either of the following options)

This change is a trivial rework / code cleanup without any test coverage.

(or)

This change is already covered by existing tests, such as (please describe tests).

(or)

This change added tests and can be verified as follows:

(example:)

  • Added integration tests for end-to-end deployment with large payloads (10MB)
  • Extended integration test for recovery after broker failure

Does this pull request potentially affect one of the following parts:

If the box was checked, please highlight the changes

  • Dependencies (add or upgrade a dependency)
  • The public API
  • The schema
  • The default values of configurations
  • The threading model
  • The binary protocol
  • The REST endpoints
  • The admin CLI options
  • The metrics
  • Anything that affects deployment

Documentation

  • doc
  • doc-required
  • doc-not-needed
  • doc-complete

Matching PR in forked repository

PR in forked repository:

@shibd

shibd commented Apr 29, 2024

Copy link
Copy Markdown
Member

hi, @tjiuming Thanks for your PR.

Maybe we can change this test to cover all related transactions get lastmessage id case.

public void testGetLastMessageIdsWithOngoingTransactions() throws Exception {

Change to:

@Test
    public void testGetLastMessageIdsWithOngoingTransactions() throws Exception {
        // 1. Prepare environment
        String topic = "persistent://" + NAMESPACE1 + "/testGetLastMessageIdsWithOngoingTransactions";
        String subName = "my-subscription";
        @Cleanup
        Producer<byte[]> producer = pulsarClient.newProducer()
                .topic(topic)
                .create();
        Consumer<byte[]> consumer = pulsarClient.newConsumer()
                .topic(topic)
                .subscriptionName(subName)
                .subscribe();

        // 2. Test last max read position can be required correctly.
        // 2.1 Case1: send 3 original messages. |1:0|1:1|1:2|
        MessageIdImpl expectedLastMessageID = null;
        for (int i = 0; i < 3; i++) {
            expectedLastMessageID = (MessageIdImpl) producer.newMessage().send();
        }
        assertMessageId(consumer, expectedLastMessageID);
        // 2.2 Case2: send 2 ongoing transactional messages and 2 original messages.
        // |1:0|1:1|1:2|txn1:start->1:3|1:4|txn2:start->1:5.
        Transaction txn1 = pulsarClient.newTransaction()
                .withTransactionTimeout(5, TimeUnit.HOURS)
                .build()
                .get();
        Transaction txn2 = pulsarClient.newTransaction()
                .withTransactionTimeout(5, TimeUnit.HOURS)
                .build()
                .get();

        // |1:0|1:1|1:2|txn1:1:3|
        producer.newMessage(txn1).send();
        
        // |1:0|1:1|1:2|txn1:1:3|1:4|
        MessageIdImpl expectedLastMessageID1 = (MessageIdImpl) producer.newMessage().send();

        // |1:0|1:1|1:2|txn1:1:3|1:4|txn2:1:5|
        producer.newMessage(txn2).send();
        
        // 2.2.1 Last message ID will not change when txn1 and txn2 do not end.
        assertMessageId(consumer, expectedLastMessageID);

        // 2.2.2 Last message ID will update to 1:4 when txn1 committed.
        // |1:0|1:1|1:2|txn1:1:3|1:4|txn2:1:5|tx1:commit->1:6|
        txn1.commit().get(5, TimeUnit.SECONDS);
        assertMessageId(consumer, expectedLastMessageID1);

        // 2.2.3 Last message ID will still to 1:4 when txn2 aborted.
        // |1:0|1:1|1:2|txn1:1:3|1:4|txn2:1:5|tx1:commit->1:6|tx2:abort->1:7|
        txn2.abort().get(5, TimeUnit.SECONDS);
        assertMessageId(consumer, expectedLastMessageID1);
    }

@dao-jun

dao-jun commented Apr 29, 2024

Copy link
Copy Markdown
Member Author

@shibd Hi baodi, I've addressed your comment, thanks!

@coderzc coderzc left a comment

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems we don't need maxReadPosition.compareTo((PositionImpl) getLastPosition()) != 0 judgment condition, since maxReadPosition also may not be a valid position for transaction topic event if maxReadPosition < lastPosition.

@dao-jun

dao-jun commented Apr 29, 2024

Copy link
Copy Markdown
Member Author

It seems we don't need maxReadPosition.compareTo((PositionImpl) getLastPosition()) != 0 judgment condition, since maxReadPosition also may not be a valid position for transaction topic event if maxReadPosition < lastPosition.

I don't understand, why?

@coderzc

coderzc commented Apr 29, 2024

Copy link
Copy Markdown
Member

It seems we don't need maxReadPosition.compareTo((PositionImpl) getLastPosition()) != 0 judgment condition, since maxReadPosition also may not be a valid position for transaction topic event if maxReadPosition < lastPosition.

I don't understand, why?

@dao-jun maxReadPosition is just the position in front of the ongoing transaction. It can only ensure that this is not a pending transaction position. but it's not necessarily a normal message.

See:

void updateMaxReadPosition(TxnID txnID) {
PositionImpl preMaxReadPosition = this.maxReadPosition;
ongoingTxns.remove(txnID);
if (!ongoingTxns.isEmpty()) {
PositionImpl position = ongoingTxns.get(ongoingTxns.firstKey());
maxReadPosition = ((ManagedLedgerImpl) topic.getManagedLedger()).getPreviousPosition(position);
} else {
maxReadPosition = (PositionImpl) topic.getManagedLedger().getLastConfirmedEntry();
}
if (preMaxReadPosition.compareTo(this.maxReadPosition) != 0) {
this.changeMaxReadPositionAndAddAbortTimes.getAndIncrement();
}
}

@dao-jun

dao-jun commented Apr 29, 2024

Copy link
Copy Markdown
Member Author

@coderzc Oh, I understand you, if the maxReadPosition is also an aborted message, the reader can be stuck too. It makes sense, I'll improve test to cover the case.

@dao-jun

dao-jun commented Apr 29, 2024

Copy link
Copy Markdown
Member Author

@coderzc PTAL

@dao-jun

dao-jun commented Apr 29, 2024

Copy link
Copy Markdown
Member Author

@codelipenghui @shibd @coderzc It looks like Shared/key_Shared Consumer also could be stuck from delayed delivery messages, WDYT? Do we need to handle it?

@shibd

shibd commented Apr 29, 2024

Copy link
Copy Markdown
Member

@codelipenghui @shibd @coderzc It looks like Shared/key_Shared Consumer also could be stuck from delayed delivery messages, WDYT? Do we need to handle it?

I prefer case by case fix it and adding enough unit tests to cover every case.

@codecov-commenter

Copy link
Copy Markdown

Codecov Report

Attention: Patch coverage is 90.00000% with 1 lines in your changes are missing coverage. Please review.

Project coverage is 72.71%. Comparing base (bbc6224) to head (59d227f).
Report is 218 commits behind head on master.

Additional details and impacted files

Impacted file tree graph

@@             Coverage Diff              @@
##             master   #22610      +/-   ##
============================================
- Coverage     73.57%   72.71%   -0.86%     
+ Complexity    32624    32532      -92     
============================================
  Files          1877     1887      +10     
  Lines        139502   141004    +1502     
  Branches      15299    15477     +178     
============================================
- Hits         102638   102534     -104     
- Misses        28908    30603    +1695     
+ Partials       7956     7867      -89     
Flag Coverage Δ
inttests 27.40% <40.00%> (+2.82%) ⬆️
systests 24.84% <40.00%> (+0.51%) ⬆️
unittests 71.46% <90.00%> (-1.39%) ⬇️

Flags with carried forward coverage won't be shown. Click here to find out more.

Files Coverage Δ
...ookkeeper/mledger/util/ManagedLedgerImplUtils.java 68.18% <100.00%> (ø)
...sar/broker/service/persistent/PersistentTopic.java 79.06% <88.88%> (+0.60%) ⬆️

... and 342 files with indirect coverage changes

lhotari
lhotari previously requested changes May 3, 2024

@lhotari lhotari left a comment

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Regarding #22572 (comment),
to further reduce code duplication, start internalAsyncReverseFindPositionOneByOne method with this part

    if (!ledger.isValidPosition(previousPosition)) {
        future.complete(previousPosition);
        return;
    }

and then this logic can be removed from asyncGetLastValidPosition and the readEntryComplete callback.

@dao-jun

dao-jun commented May 5, 2024

Copy link
Copy Markdown
Member Author

Regarding #22572 (comment), to further reduce code duplication, start internalAsyncReverseFindPositionOneByOne method with this part

    if (!ledger.isValidPosition(previousPosition)) {
        future.complete(previousPosition);
        return;
    }

and then this logic can be removed from asyncGetLastValidPosition and the readEntryComplete callback.

addressed

@dao-jun dao-jun requested a review from lhotari May 5, 2024 04:17
@shibd shibd requested a review from coderzc May 6, 2024 07:42
@dao-jun

dao-jun commented May 6, 2024

Copy link
Copy Markdown
Member Author

@lhotari PTAL

@dao-jun dao-jun merged commit 7e88463 into apache:master May 7, 2024
@dao-jun dao-jun deleted the improve/filter-aborted-txns branch May 7, 2024 12:45
lhotari pushed a commit that referenced this pull request May 14, 2024
nikhil-ctds pushed a commit to datastax/pulsar that referenced this pull request May 15, 2024
srinath-ctds pushed a commit to datastax/pulsar that referenced this pull request May 16, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

9 participants