[fix] Fix Reader can be stuck from transaction aborted messages.#22610
Conversation
|
hi, @tjiuming Thanks for your PR. Maybe we can change this test to cover all related transactions get lastmessage id case. Change to: @Test
public void testGetLastMessageIdsWithOngoingTransactions() throws Exception {
// 1. Prepare environment
String topic = "persistent://" + NAMESPACE1 + "/testGetLastMessageIdsWithOngoingTransactions";
String subName = "my-subscription";
@Cleanup
Producer<byte[]> producer = pulsarClient.newProducer()
.topic(topic)
.create();
Consumer<byte[]> consumer = pulsarClient.newConsumer()
.topic(topic)
.subscriptionName(subName)
.subscribe();
// 2. Test last max read position can be required correctly.
// 2.1 Case1: send 3 original messages. |1:0|1:1|1:2|
MessageIdImpl expectedLastMessageID = null;
for (int i = 0; i < 3; i++) {
expectedLastMessageID = (MessageIdImpl) producer.newMessage().send();
}
assertMessageId(consumer, expectedLastMessageID);
// 2.2 Case2: send 2 ongoing transactional messages and 2 original messages.
// |1:0|1:1|1:2|txn1:start->1:3|1:4|txn2:start->1:5.
Transaction txn1 = pulsarClient.newTransaction()
.withTransactionTimeout(5, TimeUnit.HOURS)
.build()
.get();
Transaction txn2 = pulsarClient.newTransaction()
.withTransactionTimeout(5, TimeUnit.HOURS)
.build()
.get();
// |1:0|1:1|1:2|txn1:1:3|
producer.newMessage(txn1).send();
// |1:0|1:1|1:2|txn1:1:3|1:4|
MessageIdImpl expectedLastMessageID1 = (MessageIdImpl) producer.newMessage().send();
// |1:0|1:1|1:2|txn1:1:3|1:4|txn2:1:5|
producer.newMessage(txn2).send();
// 2.2.1 Last message ID will not change when txn1 and txn2 do not end.
assertMessageId(consumer, expectedLastMessageID);
// 2.2.2 Last message ID will update to 1:4 when txn1 committed.
// |1:0|1:1|1:2|txn1:1:3|1:4|txn2:1:5|tx1:commit->1:6|
txn1.commit().get(5, TimeUnit.SECONDS);
assertMessageId(consumer, expectedLastMessageID1);
// 2.2.3 Last message ID will still to 1:4 when txn2 aborted.
// |1:0|1:1|1:2|txn1:1:3|1:4|txn2:1:5|tx1:commit->1:6|tx2:abort->1:7|
txn2.abort().get(5, TimeUnit.SECONDS);
assertMessageId(consumer, expectedLastMessageID1);
} |
|
@shibd Hi baodi, I've addressed your comment, thanks! |
I don't understand, why? |
@dao-jun See: |
|
@coderzc Oh, I understand you, if the maxReadPosition is also an aborted message, the reader can be stuck too. It makes sense, I'll improve test to cover the case. |
|
@coderzc PTAL |
|
@codelipenghui @shibd @coderzc It looks like Shared/key_Shared Consumer also could be stuck from delayed delivery messages, WDYT? Do we need to handle it? |
I prefer case by case fix it and adding enough unit tests to cover every case. |
Codecov ReportAttention: Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## master #22610 +/- ##
============================================
- Coverage 73.57% 72.71% -0.86%
+ Complexity 32624 32532 -92
============================================
Files 1877 1887 +10
Lines 139502 141004 +1502
Branches 15299 15477 +178
============================================
- Hits 102638 102534 -104
- Misses 28908 30603 +1695
+ Partials 7956 7867 -89
Flags with carried forward coverage won't be shown. Click here to find out more.
|
lhotari
left a comment
There was a problem hiding this comment.
Regarding #22572 (comment),
to further reduce code duplication, start internalAsyncReverseFindPositionOneByOne method with this part
if (!ledger.isValidPosition(previousPosition)) {
future.complete(previousPosition);
return;
}
and then this logic can be removed from asyncGetLastValidPosition and the readEntryComplete callback.
addressed |
|
@lhotari PTAL |
) (cherry picked from commit 7e88463)
…che#22610) (cherry picked from commit 7e88463) (cherry picked from commit f516a85)
…che#22610) (cherry picked from commit 7e88463) (cherry picked from commit f516a85)
Motivation
Fix Reader can be stuck from transaction aborted messages.
Related to #22572
Since Reader's SubscriptionType is
Exclusive, so we no need to handle DelayedDelivery messages.Modifications
Verifying this change
(Please pick either of the following options)
This change is a trivial rework / code cleanup without any test coverage.
(or)
This change is already covered by existing tests, such as (please describe tests).
(or)
This change added tests and can be verified as follows:
(example:)
Does this pull request potentially affect one of the following parts:
If the box was checked, please highlight the changes
Documentation
docdoc-requireddoc-not-neededdoc-completeMatching PR in forked repository
PR in forked repository: