Skip to content

Conversation

@lhotari
Copy link
Member

@lhotari lhotari commented Jul 26, 2025

Fixes #24566

Motivation

The test ReplicatorTest.testCloseReplicatorStartProducer was consistently failing due to critical issues introduced in PR #24551. Investigation (shared in #24566 (comment) by @3pacccccc ) revealed that the changes led to infinite recursive calls in ManagedCursorImpl#cancelPendingReadRequest, causing broker instability and test failures.

The root cause was improper state management for pending read operations, particularly when:

  • Multiple threads attempt to cancel pending read requests concurrently
  • Cursors are closed while read operations are pending
  • OpReadEntry instances are recycled and reused incorrectly
  • Cursor state transitions occur without proper synchronization
  • Cursor deletion and closure operations race with active read operations

Modifications

This PR addresses the state management issues through several key changes:

Core Synchronization Fixes

  1. Reverted problematic synchronization approach: Removed the complex pendingReadOpMutex and DelayCheckForNewEntriesTask implementation added in [fix][broker] Fix Broker OOM due to too many waiting cursors and reuse a recycled OpReadEntry incorrectly #24551 that caused infinite recursion, returning to a more robust compare-and-swap based solution.

  2. Added operation ID tracking: Introduced an id field to OpReadEntry instances to prevent recycled operations from being processed incorrectly. This ensures that only the correct operation instance is handled when checking for new entries.

  3. Improved atomic state transitions: Enhanced the CAS (Compare-And-Swap) operations using getAndUpdate() with lambda functions that verify both the operation instance and its ID before making state changes:

                 OpReadEntry waitingReadOpItem = WAITING_READ_OP_UPDATER.getAndUpdate(this,
                         current -> {
                             if (current == op && current.id == opReadId) {
                                 // update the value to null to cancel the waiting read op
                                 return null;
                             } else {
                                 // keep the current waiting read op value
                                 return current;
                             }
                         });
                 // If the waiting read op was the same as the one we are trying to cancel, it means that it was now
                 // cleared from the waitingReadOp field and therefore "cancelled"
                 if (waitingReadOpItem == op && waitingReadOpItem.id == opReadId) {
                     if (log.isDebugEnabled()) {
                         log.debug("[{}] [{}] Cancelled notification and scheduled read at {}", ledger.getName(),
                                 name, op.readPosition);
                     }
                     PENDING_READ_OPS_UPDATER.incrementAndGet(this);
                     ledger.asyncReadEntries(op);

Enhanced State Management System

  1. Comprehensive state model enhancement: Significantly expanded the cursor State enum with:

    • New states: Deleting, Deleted, DeletingFailed for better deletion lifecycle tracking
    • Added closedState boolean field and isClosed() method for more robust state checking
    • Added isDeletingOrDeleted() method for deletion state validation
    • Enhanced state constructor to properly categorize closed vs. open states
  2. Thread-safe state transition methods: Implemented atomic state transition utilities:

    • changeStateIfNotClosed() - prevents state changes on closed cursors
    • changeStateIfNotDeletingOrDeleted() - prevents state changes on deleting/deleted cursors
    • Enhanced trySetStateToClosing() with atomic operations

Cursor Lifecycle Management

  1. Proper cursor closure handling: Added comprehensive cursor closure logic:

    • closeWaitingCursor() - handles normal cursor closure scenarios
    • cancelWaitingCursorsWhenDeactivated() - handles cursor deactivation scenarios
    • internalCloseWaitingCursor() - unified internal implementation with exception supplier pattern
    • Added sentinel value (WAITING_READ_OP_FOR_CLOSED_CURSOR) to handle closed cursor scenarios
  2. Enhanced cursor deactivation: Updated setInactive() to properly cancel waiting cursors when deactivated, preventing orphaned read operations.

  3. Robust deletion lifecycle: Improved asyncDeleteCursorLedger() with:

    • Proper state transitions through deletion lifecycle
    • Enhanced retry logic with backoff for failed deletions
    • Better error handling and state recovery for deletion failures
    • Prevention of duplicate deletion attempts

Exception Handling Improvements

  1. New exception types: Added CursorDeactivatedWaitCallbackException for better error propagation when cursors are deactivated without properly cancelling pending requests.

  2. Enhanced error propagation: Improved error handling throughout the cursor lifecycle to ensure proper cleanup and user notification.

Managed Ledger Integration

  1. Enhanced cursor deletion in ManagedLedgerImpl: Improved asyncDeleteCursor() with:

    • Proper state checking before deletion attempts
    • Separate handling for durable vs. non-durable cursors
    • Better error recovery and state management
    • Prevention of concurrent deletion attempts
  2. Enhanced waiting cursor registration: Implemented thread-safe registration/deregistration of waiting cursors to prevent race conditions when adding or removing cursors from the waiting list, with improved debug logging.

Specialized Cursor Implementation Updates

  1. Non-durable cursor improvements: Updated NonDurableCursorImpl.asyncClose() to properly close waiting cursors and set inactive state.

  2. Read-only cursor improvements: Updated ReadOnlyCursorImpl.asyncClose() with the same proper cleanup logic.

Misc refactoring in ManagedCursorImpl

  1. In the previous code, the state field was sometimes read directly and sometimes with STATE_UPDATER.get. Similarly, it was sometimes set directly and mostly with STATE_UPDATER.set. There's no need to use STATE_UPDATER.get and STATE_UPDATER.set. The volatile state field can be read and set directly without STATE_UPDATER.
    STATE_UPDATER is only needed for CAS operations on the state field.

Key Technical Improvements

  • Eliminated infinite recursion: The previous implementation could enter infinite recursive calls in cancelPendingReadRequest(), which is now resolved
  • Thread-safe operation lifecycle: OpReadEntry instances are properly tracked through their lifecycle to prevent use-after-recycle scenarios
  • Comprehensive state management: New state model provides better visibility and control over cursor lifecycle
  • Enhanced error handling: Cursor closure and deletion events now properly propagate appropriate exceptions to pending operations
  • Performance optimization: Reduced lock contention by using atomic operations instead of synchronized blocks where possible
  • Improved debugging: Added comprehensive debug logging for cursor registration/deregistration operations
  • Robust deletion handling: Deletion operations now properly handle failures, retries, and state recovery

Verifying this change

  • Make sure that the change passes the CI checks.

This change is already covered by existing tests, specifically:

  • ReplicatorTest.testCloseReplicatorStartProducer - verifies proper cleanup when replicator cursors are closed
  • FailoverSubscriptionTest.testWaitingCursorsCountAfterSwitchingActiveConsumers - verifies that each cursor doesn't get added more than once into ManagedLedgerImpl's waiting cursors list
  • Existing managed ledger tests that exercise concurrent read operations
  • Integration tests that stress-test cursor state management under load
  • Tests covering cursor deletion and state transitions

The modifications also include timeout adjustments for the test to account for the improved error handling paths.

Impact Assessment

This change affects:

  • Threading model: Improved thread safety in cursor state management with atomic state transitions
  • Error handling: Better propagation of cursor closure and deactivation exceptions
  • Performance: Reduced lock contention in high-throughput scenarios through atomic operations
  • Lifecycle management: More robust cursor deletion and closure handling
  • State consistency: Enhanced state model prevents invalid state transitions
  • Resource cleanup: Better cleanup of waiting cursors and pending operations

The changes are backward compatible and do not affect public APIs or configuration defaults.

Documentation

  • doc
  • doc-required
  • doc-not-needed
  • doc-complete

@lhotari lhotari added this to the 4.1.0 milestone Jul 26, 2025
@lhotari lhotari self-assigned this Jul 26, 2025
@github-actions github-actions bot added the doc-not-needed Your PR changes do not impact docs label Jul 26, 2025
@lhotari lhotari changed the title [WIP][fix][broker] Fix remaining ManagedCursorImpl issues around state management for pending reads [fix][broker] Fix remaining ManagedCursorImpl issues around state management for pending reads Jul 26, 2025
@lhotari lhotari changed the title [fix][broker] Fix remaining ManagedCursorImpl issues around state management for pending reads [fix][broker] Fix race conditions and infinite recursion in ManagedCursorImpl pending read operations Jul 26, 2025
@lhotari lhotari changed the title [fix][broker] Fix race conditions and infinite recursion in ManagedCursorImpl pending read operations [fix][broker] Fix ManagedCursor state management race conditions and lifecycle issues Jul 26, 2025
@lhotari
Copy link
Member Author

lhotari commented Jul 26, 2025

I learned through some failing tests that there are some really hairy details of the havePendingRead state in dispatcher and the related state in ManagedCursorImpl (waitingReadOp) and ManagedLedgerImpl (waitingCursors). It's like there's a circular runtime dependency of state.

@codecov-commenter
Copy link

codecov-commenter commented Jul 27, 2025

Codecov Report

❌ Patch coverage is 71.17647% with 49 lines in your changes missing coverage. Please review.
✅ Project coverage is 74.32%. Comparing base (bbc6224) to head (7702842).
⚠️ Report is 1250 commits behind head on master.

Files with missing lines Patch % Lines
...che/bookkeeper/mledger/impl/ManagedCursorImpl.java 68.11% 24 Missing and 20 partials ⚠️
...rg/apache/bookkeeper/mledger/impl/OpReadEntry.java 76.92% 2 Missing and 1 partial ⚠️
...che/bookkeeper/mledger/impl/ManagedLedgerImpl.java 85.71% 2 Missing ⚠️
Additional details and impacted files

Impacted file tree graph

@@             Coverage Diff              @@
##             master   #24569      +/-   ##
============================================
+ Coverage     73.57%   74.32%   +0.74%     
- Complexity    32624    32674      +50     
============================================
  Files          1877     1880       +3     
  Lines        139502   146446    +6944     
  Branches      15299    16791    +1492     
============================================
+ Hits         102638   108844    +6206     
- Misses        28908    28960      +52     
- Partials       7956     8642     +686     
Flag Coverage Δ
inttests 26.77% <45.88%> (+2.19%) ⬆️
systests 23.29% <55.88%> (-1.03%) ⬇️
unittests 73.82% <71.17%> (+0.98%) ⬆️

Flags with carried forward coverage won't be shown. Click here to find out more.

Files with missing lines Coverage Δ
.../bookkeeper/mledger/impl/NonDurableCursorImpl.java 82.00% <100.00%> (+0.75%) ⬆️
...he/bookkeeper/mledger/impl/ReadOnlyCursorImpl.java 95.45% <100.00%> (+0.45%) ⬆️
...ker/service/persistent/PersistentSubscription.java 77.17% <100.00%> (+0.48%) ⬆️
...che/bookkeeper/mledger/impl/ManagedLedgerImpl.java 81.11% <85.71%> (+0.45%) ⬆️
...rg/apache/bookkeeper/mledger/impl/OpReadEntry.java 75.51% <76.92%> (-8.81%) ⬇️
...che/bookkeeper/mledger/impl/ManagedCursorImpl.java 78.26% <68.11%> (-1.04%) ⬇️

... and 1116 files with indirect coverage changes

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.
  • 📦 JS Bundle Analysis: Save yourself from yourself by tracking and limiting bundle sizes in JS merges.

Copy link
Contributor

@BewareMyPower BewareMyPower left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've gone through the whole PR, overall LGTM except for some coding style issues and some questions I've left. Please address these comments before merging.

P.S. The state machine is hard to read. It would be helpful to have a graph to show all possible state changes, but it's not required.

@lhotari lhotari requested a review from BewareMyPower July 28, 2025 17:13
@lhotari
Copy link
Member Author

lhotari commented Jul 28, 2025

I've gone through the whole PR, overall LGTM except for some coding style issues and some questions I've left. Please address these comments before merging.

P.S. The state machine is hard to read. It would be helpful to have a graph to show all possible state changes, but it's not required.

Thank you @BewareMyPower. I've addressed your review comments. PTAL.

Copy link
Contributor

@BewareMyPower BewareMyPower left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM, left a comment about logging but it's not an important issue

@lhotari lhotari dismissed codelipenghui’s stale review July 29, 2025 07:07

@codelipenghui, your review comments have been address. I'm dismissing the "request changes" status so that we could proceed in unblocking Pulsar CI by merging this PR. Please review again.

@lhotari lhotari requested a review from codelipenghui July 29, 2025 07:07
@lhotari lhotari merged commit c96f27a into apache:master Jul 29, 2025
139 of 145 checks passed
lhotari added a commit that referenced this pull request Jul 29, 2025
BewareMyPower pushed a commit that referenced this pull request Jul 29, 2025
lhotari added a commit that referenced this pull request Jul 29, 2025
priyanshu-ctds pushed a commit to datastax/pulsar that referenced this pull request Jul 29, 2025
…lifecycle issues (apache#24569)

(cherry picked from commit c96f27a)
(cherry picked from commit ec56ca5)
priyanshu-ctds pushed a commit to datastax/pulsar that referenced this pull request Jul 29, 2025
…lifecycle issues (apache#24569)

(cherry picked from commit c96f27a)
(cherry picked from commit c324228)
srinath-ctds pushed a commit to datastax/pulsar that referenced this pull request Jul 29, 2025
…lifecycle issues (apache#24569)

(cherry picked from commit c96f27a)
(cherry picked from commit ec56ca5)
srinath-ctds pushed a commit to datastax/pulsar that referenced this pull request Jul 30, 2025
…lifecycle issues (apache#24569)

(cherry picked from commit c96f27a)
(cherry picked from commit ec56ca5)
nodece pushed a commit to ascentstream/pulsar that referenced this pull request Jul 31, 2025
srinath-ctds pushed a commit to datastax/pulsar that referenced this pull request Jul 31, 2025
…lifecycle issues (apache#24569)

(cherry picked from commit c96f27a)
(cherry picked from commit c324228)
poorbarcode pushed a commit to poorbarcode/pulsar that referenced this pull request Aug 14, 2025
KannarFr pushed a commit to CleverCloud/pulsar that referenced this pull request Sep 22, 2025
walkinggo pushed a commit to walkinggo/pulsar that referenced this pull request Oct 8, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Flaky-test: ReplicatorTest.testCloseReplicatorStartProducer

6 participants