Skip to content

Conversation

@poorbarcode
Copy link
Contributor

@poorbarcode poorbarcode commented Jun 23, 2025

Motivation

background: #8659 introduced adding batchSize into CommandAck to check conflict commits if enabled TXN.

Issue: #15729 added support for acking a message list with transactions. The PR forgot to add batchSize into CommandAck when calling consumer.acknowledgeAsync(List<MessageId>). Thanks @codelipenghui for mentioning this knowledge

The issue causes a negative value for the consumer's unacknowledged message counter. You can reproduce the issue with the new test testUnackMessageAfterAckAllMessages(true, true, true). Because acknowledge uses batch size to calculate the messages acknowledged, see: https://github.com/apache/pulsar/blob/master/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java#L673.

private long getAckedCountForTransactionAck(int batchSize, long[] ackSets) {
    BitSetRecyclable bitset = BitSetRecyclable.create().resetWords(ackSets);
    long ackedCount = batchSize - bitset.cardinality();
    bitset.recycle();
    return ackedCount;
}

Modifications

  • Add batchSize into CommandAck when calling consumer.acknowledgeAsync(List<MessageId>)
  • Disable the test TransactionEndToEndWithoutBatchIndexAckTest.testAckWithTransactionReduceUnAckMessageCount, because it is meaningless if the client-side enables batch ack and the broker-side disables batch ack(we need a PIP to deny this behavior in the future).

Documentation

  • doc
  • doc-required
  • doc-not-needed
  • doc-complete

Matching PR in forked repository

PR in forked repository: x

@poorbarcode poorbarcode added this to the 4.1.0 milestone Jun 23, 2025
@poorbarcode poorbarcode self-assigned this Jun 23, 2025
@github-actions github-actions bot added the doc-not-needed Your PR changes do not impact docs label Jun 23, 2025
@poorbarcode poorbarcode changed the title [fix][txn]Foget to carry the batch size info when commit messages list [fix][txn]Negative un-ack messages because foget to carry the batch size info when commit messages list Jun 23, 2025
Copy link
Contributor

@codelipenghui codelipenghui left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The fix looks good to me.

But it seems like the issue was not introduced by #8659

https://github.com/apache/pulsar/pull/8659/files#diff-f6e4c1c4091aa10525f331e48e66b29f22b9f7987755c1b4fb887e24f198bed6R2425-R2426 has the batch_size for the ack command. It seems like the following changes to this method has removed the batch size.

@codelipenghui
Copy link
Contributor

@lhotari
Copy link
Member

lhotari commented Jun 23, 2025

@poorbarcode please fix grammar in the title "Negative un-ack messages because foget to carry the batch size info when commit messages list". It's currently not very easy to read and understand.
Using LLM to fix grammar is very help (instructions).

@lhotari lhotari requested a review from Copilot June 23, 2025 18:29
Copy link

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull Request Overview

This PR fixes negative unacknowledged messages in transactions by ensuring that the batch size is added into CommandAck, and it disables a test that no longer reflects the desired behavior.

  • Add batchSize info to CommandAck in the consumer acknowledge logic
  • Disable an outdated transaction ack test
  • Update test access modifiers and add a new data provider for unack messages tests

Reviewed Changes

Copilot reviewed 3 out of 3 changed files in this pull request and generated no comments.

File Description
pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java Adds batchSize to CommandAck to correctly compute acknowledged messages
pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TransactionEndToEndWithoutBatchIndexAckTest.java Disables tests that are no longer valid given the client and broker batch ack behavior
pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TransactionEndToEndTest.java Introduces a new data provider and makes test methods public for consistency
Comments suppressed due to low confidence (2)

pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java:3254

  • Ensure that adding the batchSize to messageIdData aligns with how batch acknowledgements are handled elsewhere in the code and covers all necessary scenarios.
                messageIdData.setBatchSize(messageIdAdv.getBatchSize());

pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TransactionEndToEndTest.java:522

  • [nitpick] Review the change from private to public on the test method; while making the method public may be necessary for JUnit, confirm that the access level is consistent with the project’s testing conventions.
    public void testAckWithTransactionReduceUnAckMessageCount(boolean enableBatch) throws Exception {

@poorbarcode poorbarcode changed the title [fix][txn]Negative un-ack messages because foget to carry the batch size info when commit messages list [fix][txn] Fix negative unacknowledged messages in transactions by ensuring that the batch size is added into CommandAck Jun 24, 2025
@poorbarcode
Copy link
Contributor Author

@codelipenghui

@poorbarcode The issue should be introduced by https://github.com/apache/pulsar/pull/15729/files#diff-f6e4c1c4091aa10525f331e48e66b29f22b9f7987755c1b4fb887e24f198bed6R2772-R2773

Ah, you are correct, I have modified the Motivation.

@poorbarcode
Copy link
Contributor Author

@lhotari

@poorbarcode please fix grammar in the title "Negative un-ack messages because foget to carry the batch size info when commit messages list". It's currently not very easy to read and understand.
Using LLM to fix grammar is very help (instructions).

Fixed

@codecov-commenter
Copy link

Codecov Report

All modified and coverable lines are covered by tests ✅

Project coverage is 74.31%. Comparing base (bbc6224) to head (9c0694a).
Report is 1160 commits behind head on master.

Additional details and impacted files

Impacted file tree graph

@@             Coverage Diff              @@
##             master   #24443      +/-   ##
============================================
+ Coverage     73.57%   74.31%   +0.73%     
+ Complexity    32624    32374     -250     
============================================
  Files          1877     1868       -9     
  Lines        139502   145498    +5996     
  Branches      15299    16643    +1344     
============================================
+ Hits         102638   108122    +5484     
+ Misses        28908    28830      -78     
- Partials       7956     8546     +590     
Flag Coverage Δ
inttests 26.99% <0.00%> (+2.41%) ⬆️
systests 23.31% <0.00%> (-1.01%) ⬇️
unittests 73.79% <100.00%> (+0.95%) ⬆️

Flags with carried forward coverage won't be shown. Click here to find out more.

Files with missing lines Coverage Δ
...va/org/apache/pulsar/client/impl/ConsumerImpl.java 80.28% <100.00%> (+2.71%) ⬆️

... and 1086 files with indirect coverage changes

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.
  • 📦 JS Bundle Analysis: Save yourself from yourself by tracking and limiting bundle sizes in JS merges.

@Technoboy- Technoboy- merged commit 23a2ef2 into apache:master Jun 24, 2025
157 of 164 checks passed
Copy link
Member

@lhotari lhotari left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

poorbarcode added a commit that referenced this pull request Jul 7, 2025
…suring that the batch size is added into CommandAck (#24443)

(cherry picked from commit 23a2ef2)
poorbarcode added a commit that referenced this pull request Jul 7, 2025
…suring that the batch size is added into CommandAck (#24443)

(cherry picked from commit 23a2ef2)
poorbarcode added a commit that referenced this pull request Jul 7, 2025
…suring that the batch size is added into CommandAck (#24443)

(cherry picked from commit 23a2ef2)
priyanshu-ctds pushed a commit to datastax/pulsar that referenced this pull request Jul 22, 2025
…suring that the batch size is added into CommandAck (apache#24443)

(cherry picked from commit 23a2ef2)
(cherry picked from commit 8d007dd)
priyanshu-ctds pushed a commit to datastax/pulsar that referenced this pull request Jul 22, 2025
…suring that the batch size is added into CommandAck (apache#24443)

(cherry picked from commit 23a2ef2)
(cherry picked from commit 50da3cc)
srinath-ctds pushed a commit to datastax/pulsar that referenced this pull request Jul 23, 2025
…suring that the batch size is added into CommandAck (apache#24443)

(cherry picked from commit 23a2ef2)
(cherry picked from commit 8d007dd)
srinath-ctds pushed a commit to datastax/pulsar that referenced this pull request Jul 24, 2025
…suring that the batch size is added into CommandAck (apache#24443)

(cherry picked from commit 23a2ef2)
(cherry picked from commit 50da3cc)
nodece pushed a commit to ascentstream/pulsar that referenced this pull request Jul 28, 2025
…suring that the batch size is added into CommandAck (apache#24443)

(cherry picked from commit 23a2ef2)
nodece pushed a commit to ascentstream/pulsar that referenced this pull request Jul 28, 2025
…suring that the batch size is added into CommandAck (apache#24443)

(cherry picked from commit 23a2ef2)
KannarFr pushed a commit to CleverCloud/pulsar that referenced this pull request Sep 22, 2025
…suring that the batch size is added into CommandAck (apache#24443)
walkinggo pushed a commit to walkinggo/pulsar that referenced this pull request Oct 8, 2025
…suring that the batch size is added into CommandAck (apache#24443)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

6 participants