-
Notifications
You must be signed in to change notification settings - Fork 3.7k
[fix][txn] Fix negative unacknowledged messages in transactions by ensuring that the batch size is added into CommandAck #24443
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
codelipenghui
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The fix looks good to me.
But it seems like the issue was not introduced by #8659
https://github.com/apache/pulsar/pull/8659/files#diff-f6e4c1c4091aa10525f331e48e66b29f22b9f7987755c1b4fb887e24f198bed6R2425-R2426 has the batch_size for the ack command. It seems like the following changes to this method has removed the batch size.
|
@poorbarcode please fix grammar in the title "Negative un-ack messages because foget to carry the batch size info when commit messages list". It's currently not very easy to read and understand. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull Request Overview
This PR fixes negative unacknowledged messages in transactions by ensuring that the batch size is added into CommandAck, and it disables a test that no longer reflects the desired behavior.
- Add batchSize info to CommandAck in the consumer acknowledge logic
- Disable an outdated transaction ack test
- Update test access modifiers and add a new data provider for unack messages tests
Reviewed Changes
Copilot reviewed 3 out of 3 changed files in this pull request and generated no comments.
| File | Description |
|---|---|
| pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java | Adds batchSize to CommandAck to correctly compute acknowledged messages |
| pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TransactionEndToEndWithoutBatchIndexAckTest.java | Disables tests that are no longer valid given the client and broker batch ack behavior |
| pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TransactionEndToEndTest.java | Introduces a new data provider and makes test methods public for consistency |
Comments suppressed due to low confidence (2)
pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java:3254
- Ensure that adding the batchSize to messageIdData aligns with how batch acknowledgements are handled elsewhere in the code and covers all necessary scenarios.
messageIdData.setBatchSize(messageIdAdv.getBatchSize());
pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TransactionEndToEndTest.java:522
- [nitpick] Review the change from private to public on the test method; while making the method public may be necessary for JUnit, confirm that the access level is consistent with the project’s testing conventions.
public void testAckWithTransactionReduceUnAckMessageCount(boolean enableBatch) throws Exception {
Ah, you are correct, I have modified the Motivation. |
Fixed |
Codecov ReportAll modified and coverable lines are covered by tests ✅
Additional details and impacted files@@ Coverage Diff @@
## master #24443 +/- ##
============================================
+ Coverage 73.57% 74.31% +0.73%
+ Complexity 32624 32374 -250
============================================
Files 1877 1868 -9
Lines 139502 145498 +5996
Branches 15299 16643 +1344
============================================
+ Hits 102638 108122 +5484
+ Misses 28908 28830 -78
- Partials 7956 8546 +590
Flags with carried forward coverage won't be shown. Click here to find out more.
🚀 New features to boost your workflow:
|
lhotari
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
…suring that the batch size is added into CommandAck (apache#24443) (cherry picked from commit 23a2ef2) (cherry picked from commit 8d007dd)
…suring that the batch size is added into CommandAck (apache#24443) (cherry picked from commit 23a2ef2) (cherry picked from commit 50da3cc)
…suring that the batch size is added into CommandAck (apache#24443) (cherry picked from commit 23a2ef2) (cherry picked from commit 8d007dd)
…suring that the batch size is added into CommandAck (apache#24443) (cherry picked from commit 23a2ef2) (cherry picked from commit 50da3cc)
…suring that the batch size is added into CommandAck (apache#24443) (cherry picked from commit 23a2ef2)
…suring that the batch size is added into CommandAck (apache#24443) (cherry picked from commit 23a2ef2)
…suring that the batch size is added into CommandAck (apache#24443)
…suring that the batch size is added into CommandAck (apache#24443)
Motivation
background: #8659 introduced adding
batchSizeintoCommandAckto check conflict commits if enabled TXN.Issue: #15729 added support for acking a message list with transactions. The PR forgot to add
batchSizeintoCommandAckwhen callingconsumer.acknowledgeAsync(List<MessageId>). Thanks @codelipenghui for mentioning this knowledgeThe issue causes a negative value for the consumer's unacknowledged message counter. You can reproduce the issue with the new test
testUnackMessageAfterAckAllMessages(true, true, true). Becauseacknowledgeusesbatch sizeto calculate the messages acknowledged, see: https://github.com/apache/pulsar/blob/master/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java#L673.Modifications
batchSizeintoCommandAckwhen callingconsumer.acknowledgeAsync(List<MessageId>)TransactionEndToEndWithoutBatchIndexAckTest.testAckWithTransactionReduceUnAckMessageCount, because it is meaningless if the client-side enables batch ack and the broker-side disables batch ack(we need a PIP to deny this behavior in the future).Documentation
docdoc-requireddoc-not-neededdoc-completeMatching PR in forked repository
PR in forked repository: x