Skip to content

Conversation

@BewareMyPower
Copy link
Contributor

Fixes #24724

Modifications

Change pendingIndividualBatchIndexAcks to a ConcurrentSkipListMap and call pollFirst() in the loop to ensure concurrent flush call won't copy the reference to the ConcurrentBitSetRecyclable twice to newMultiMessageAckCommon in different threads, which recycles the object.

Verifying this change

It's hard to write a unit test because this race condition is hard to simulate.

Documentation

  • doc
  • doc-required
  • doc-not-needed
  • doc-complete

Matching PR in forked repository

PR in forked repository:

Additional information

This PR is a simple fix on the issue that avoids refactorings. However, it might not be worth allowing flush to be called concurrently. It's hard to test the race condition and could send

The advantage is just to avoid coarse grained lock on all methods that access the following fields:

  • pendingIndividualAcks
  • pendingIndividualBatchIndexAcks
  • lastCumulativeAck

However, from my perspective, it's a pre-mature optimization that makes code error-prone. Because acquiring a lock for all acknowledgeAsync calls should not be a bottle neck of consumer side, whose time consuming tasks are mainly the business logic that handles messages and receive calls that have many lock acquirements as well.

The use of ConcurrentBitSetRecyclable is also a pre-mature optimization that results to the bug described in #24724.

As I've shared in https://lists.apache.org/thread/b5r13oz24y935p6o8tfwf578xk35wwpf, sharing a recyclable object among different threads is not a good practice, which introduces more overhead and bypasses the performance benefits of the thread-local stacks. There might be still other potential issues with the use of ConcurrentBitSetRecyclable, I might open another PR for the code refactoring.

Copy link

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull Request Overview

This PR fixes a concurrency issue where ConcurrentBitSetRecyclable objects were being recycled incorrectly when multiple threads called flush() simultaneously. The fix changes the data structure from ConcurrentHashMap to ConcurrentSkipListMap and uses atomic polling operations to prevent race conditions.

  • Replace ConcurrentHashMap with ConcurrentSkipListMap for pendingIndividualBatchIndexAcks
  • Use pollFirstEntry() instead of iterator-based removal to ensure atomic operations
  • Add null check to handle concurrent removal scenarios

Tip: Customize your code reviews with copilot-instructions.md. Create the file or learn how to get started.

@codecov-commenter
Copy link

Codecov Report

✅ All modified and coverable lines are covered by tests.
✅ Project coverage is 74.14%. Comparing base (415c6fa) to head (40fb81f).
⚠️ Report is 2 commits behind head on master.

Additional details and impacted files

Impacted file tree graph

@@             Coverage Diff              @@
##             master   #24725      +/-   ##
============================================
- Coverage     74.27%   74.14%   -0.14%     
+ Complexity    33569    33117     -452     
============================================
  Files          1896     1896              
  Lines        148111   148109       -2     
  Branches      17164    17163       -1     
============================================
- Hits         110009   109814     -195     
- Misses        29370    29519     +149     
- Partials       8732     8776      +44     
Flag Coverage Δ
inttests 26.48% <57.14%> (-0.01%) ⬇️
systests 22.68% <100.00%> (+0.01%) ⬆️
unittests 73.65% <100.00%> (-0.15%) ⬇️

Flags with carried forward coverage won't be shown. Click here to find out more.

Files with missing lines Coverage Δ
...impl/PersistentAcknowledgmentsGroupingTracker.java 87.65% <100.00%> (-0.08%) ⬇️

... and 83 files with indirect coverage changes

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.
  • 📦 JS Bundle Analysis: Save yourself from yourself by tracking and limiting bundle sizes in JS merges.

@BewareMyPower BewareMyPower merged commit 14543d3 into apache:master Sep 11, 2025
96 of 98 checks passed
@BewareMyPower BewareMyPower deleted the bewaremypower/fix-ack-flush-race branch September 11, 2025 14:19
lhotari pushed a commit that referenced this pull request Sep 11, 2025
lhotari pushed a commit that referenced this pull request Sep 11, 2025
lhotari pushed a commit that referenced this pull request Sep 11, 2025
lhotari pushed a commit that referenced this pull request Sep 11, 2025
manas-ctds pushed a commit to datastax/pulsar that referenced this pull request Sep 12, 2025
…ng different threads (apache#24725)

(cherry picked from commit 14543d3)
(cherry picked from commit ba50b60)
srinath-ctds pushed a commit to datastax/pulsar that referenced this pull request Sep 12, 2025
…ng different threads (apache#24725)

(cherry picked from commit 14543d3)
(cherry picked from commit ba50b60)
ganesh-ctds pushed a commit to datastax/pulsar that referenced this pull request Sep 15, 2025
…ng different threads (apache#24725)

(cherry picked from commit 14543d3)
(cherry picked from commit 498571d)
ganesh-ctds pushed a commit to datastax/pulsar that referenced this pull request Sep 15, 2025
…ng different threads (apache#24725)

(cherry picked from commit 14543d3)
(cherry picked from commit 498571d)
srinath-ctds pushed a commit to datastax/pulsar that referenced this pull request Sep 15, 2025
…ng different threads (apache#24725)

(cherry picked from commit 14543d3)
(cherry picked from commit 498571d)
nodece pushed a commit to nodece/pulsar that referenced this pull request Sep 16, 2025
@lhotari lhotari added this to the 4.2.0 milestone Sep 17, 2025
KannarFr pushed a commit to CleverCloud/pulsar that referenced this pull request Sep 22, 2025
walkinggo pushed a commit to walkinggo/pulsar that referenced this pull request Oct 8, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

[Bug] Batch index ACK might recycle ConcurrentBitSetRecyclable twice

5 participants