Skip to content

Conversation

@poorbarcode
Copy link
Contributor

Motivation

Background

1. Pending read if no entries to read.

  • Cursor sets itself into managedledger.waitingCursors if there are no entries to read.
  • Managed ledger triggers cursor.notifyEntriesAvailable once new entries were added.

2, Double-check that it has more entries.

  • The cursor does an additional check if there are no entries to read[code-1].
  • The cursor delays the second checking if the configuration newEntriesCheckDelayInMillis is larger than 0[code-1].

3. The variable waitingReadOp of cursor

  • It used to maintain an OpReadEntry to trigger a new read after getting notified that new entries were added.
  • It prevents an issue from occurring, where more than one pending read is in progress

4. The variable waitingCursors of Managed Cursor

  • It maintains the list of cursors that are waiting for new entries.

Issue 1: The same cursor was added to managedledger.waitingCursors thousands of times.

  • Configurations
    • subscription type: Failover
  • Reproduce flow
    • Add consumer(C1); it becomes an active consumer.
    • Add consumer(C2); C2 becomes the active consumer after a choice by the dispatcher
      • Triggers a scheduleReadOnActiveConsumer
      • (Highlight) The method scheduleReadOnActiveConsumer resets the variable cursor.waitingReadOp, but it forgets to remove the cursor from managedledger.waitingCursors
      • The issue occurs, which is the same as [Bug] Broker memory leak  #22157. You can reproduce the issue with the new test testWaitingCursorsCountAfterSwitchingActiveConsumers
Screenshot 2025-07-24 at 00 01 32

Issue 2: Reused a recycled OpReadEntry, which causes repeated delivery and other unexpected issues

time cursor -> read entries cursor -> cancel pending read Other tasks
1 has no entries to read
2 create an OpReadEntry and set to cursor.waitingReadOp
3 Schedule a delay task to do the double confirm of whether Managed Ledger has entries to read
4 The delayed task is waiting to get run
5 start to cancel pending read
6 clear cursor.waitingReadOp
7 recycle stored OpReadEntry, which was created by the task read entries
8 Run delayed task
9 reused the recycled OpReadEntry
10 Uses the OpReadEntry to read entries, but it has been recycled and reused by other tasks

Issue 3: The method cursor.checkForNewEntries forgets to remove cursor from managedledger.waitingCursors.

  • cursor.checkForNewEntries implements the double-check that checks whether there are entries to read.
  • Clear cursor.waitingReadOp and trigger a reading if there are entries to read after the first check.
  • The method forgot to remove the cursor from managedledger.waitingCursors after clearing the variable cursor.waitingReadOp

            if (!WAITING_READ_OP_UPDATER.compareAndSet(this, null, op)) {
                op.recycle();
                callback.readEntriesFailed(new ManagedLedgerException.ConcurrentWaitCallbackException(), ctx);
                return;
            }
            if (getConfig().getNewEntriesCheckDelayInMillis() > 0) {
                ledger.getScheduledExecutor()
                        .schedule(() -> checkForNewEntries(op, callback, ctx),
                                getConfig().getNewEntriesCheckDelayInMillis(), TimeUnit.MILLISECONDS);
            } else {
                // If there's no delay, check directly from the same thread
                checkForNewEntries(op, callback, ctx);
            }
            if (hasMoreEntries()) {
                if (WAITING_READ_OP_UPDATER.compareAndSet(this, op, null)) {
                    if (log.isDebugEnabled()) {
                        log.debug("[{}] [{}] Cancelled notification and scheduled read at {}", ledger.getName(),
                                name, op.readPosition);
                    }
                    PENDING_READ_OPS_UPDATER.incrementAndGet(this);
                    ledger.asyncReadEntries(op);
                } 
            }

Modifications

  • Fix the 3 issues
  • Instead of modifying managedledger.waitingCursors and cursor.waitingReadOp everywhere, Managed Cursor handles the variables itself

Documentation

  • doc
  • doc-required
  • doc-not-needed
  • doc-complete

Matching PR in forked repository

PR in forked repository: x

@poorbarcode poorbarcode added this to the 4.1.0 milestone Jul 23, 2025
@poorbarcode poorbarcode self-assigned this Jul 23, 2025
@poorbarcode poorbarcode added type/bug The PR fixed a bug or issue reported a bug release/4.0.6 labels Jul 23, 2025
@github-actions github-actions bot added the doc-not-needed Your PR changes do not impact docs label Jul 23, 2025
@poorbarcode
Copy link
Contributor Author

/pulsarbot rerun-failure-checks

@codecov-commenter
Copy link

Codecov Report

Attention: Patch coverage is 75.00000% with 15 lines in your changes missing coverage. Please review.

Project coverage is 74.31%. Comparing base (bbc6224) to head (b5cd5e5).
Report is 1222 commits behind head on master.

Files with missing lines Patch % Lines
...che/bookkeeper/mledger/impl/ManagedCursorImpl.java 74.57% 10 Missing and 5 partials ⚠️
Additional details and impacted files

Impacted file tree graph

@@             Coverage Diff              @@
##             master   #24551      +/-   ##
============================================
+ Coverage     73.57%   74.31%   +0.74%     
- Complexity    32624    32967     +343     
============================================
  Files          1877     1874       -3     
  Lines        139502   146282    +6780     
  Branches      15299    16777    +1478     
============================================
+ Hits         102638   108710    +6072     
- Misses        28908    28939      +31     
- Partials       7956     8633     +677     
Flag Coverage Δ
inttests 26.70% <61.66%> (+2.12%) ⬆️
systests 23.31% <66.66%> (-1.01%) ⬇️
unittests 73.81% <71.66%> (+0.97%) ⬆️

Flags with carried forward coverage won't be shown. Click here to find out more.

Files with missing lines Coverage Δ
...che/bookkeeper/mledger/impl/ManagedLedgerImpl.java 81.35% <100.00%> (+0.69%) ⬆️
...ker/service/persistent/PersistentSubscription.java 76.75% <ø> (+0.05%) ⬆️
...che/bookkeeper/mledger/impl/ManagedCursorImpl.java 78.68% <74.57%> (-0.62%) ⬇️

... and 1106 files with indirect coverage changes

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.
  • 📦 JS Bundle Analysis: Save yourself from yourself by tracking and limiting bundle sizes in JS merges.

Copy link
Member

@lhotari lhotari left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@lhotari lhotari merged commit 48dc9ec into apache:master Jul 25, 2025
52 checks passed
lhotari pushed a commit that referenced this pull request Jul 25, 2025
…e a recycled OpReadEntry incorrectly (#24551)

(cherry picked from commit 48dc9ec)
lhotari pushed a commit that referenced this pull request Jul 25, 2025
…e a recycled OpReadEntry incorrectly (#24551)

(cherry picked from commit 48dc9ec)
lhotari pushed a commit that referenced this pull request Jul 25, 2025
…e a recycled OpReadEntry incorrectly (#24551)

(cherry picked from commit 48dc9ec)
lhotari added a commit to lhotari/pulsar that referenced this pull request Jul 26, 2025
nodece pushed a commit to ascentstream/pulsar that referenced this pull request Jul 28, 2025
…e a recycled OpReadEntry incorrectly (apache#24551)

(cherry picked from commit 48dc9ec)
nodece pushed a commit to ascentstream/pulsar that referenced this pull request Jul 28, 2025
…e a recycled OpReadEntry incorrectly (apache#24551)

(cherry picked from commit 48dc9ec)
priyanshu-ctds pushed a commit to datastax/pulsar that referenced this pull request Jul 28, 2025
…e a recycled OpReadEntry incorrectly (apache#24551)

(cherry picked from commit 48dc9ec)
(cherry picked from commit f68bf2b)
priyanshu-ctds pushed a commit to datastax/pulsar that referenced this pull request Jul 28, 2025
…e a recycled OpReadEntry incorrectly (apache#24551)

(cherry picked from commit 48dc9ec)
(cherry picked from commit abf511b)
srinath-ctds pushed a commit to datastax/pulsar that referenced this pull request Jul 29, 2025
…e a recycled OpReadEntry incorrectly (apache#24551)

(cherry picked from commit 48dc9ec)
(cherry picked from commit abf511b)
srinath-ctds pushed a commit to datastax/pulsar that referenced this pull request Jul 30, 2025
…e a recycled OpReadEntry incorrectly (apache#24551)

(cherry picked from commit 48dc9ec)
(cherry picked from commit abf511b)
srinath-ctds pushed a commit to datastax/pulsar that referenced this pull request Jul 31, 2025
…e a recycled OpReadEntry incorrectly (apache#24551)

(cherry picked from commit 48dc9ec)
(cherry picked from commit f68bf2b)
KannarFr pushed a commit to CleverCloud/pulsar that referenced this pull request Sep 22, 2025
walkinggo pushed a commit to walkinggo/pulsar that referenced this pull request Oct 8, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

6 participants