Skip to content

Conversation

@3pacccccc
Copy link
Contributor

Fixes #24906

Motivation

the test log is: https://gist.github.com/3pacccccc/7050810fa187f17046d826e7971c174a
From the error logs, we can see the exact failure:

java.util.ConcurrentModificationException: null
	at java.base/java.util.ArrayList$Itr.checkForComodification(ArrayList.java:1013) ~[?:?]
	at java.base/java.util.ArrayList$Itr.next(ArrayList.java:967) ~[?:?]
	at org.apache.bookkeeper.client.PulsarMockLedgerHandle.getLength(PulsarMockLedgerHandle.java:235) ~[testmocks-4.2.0-SNAPSHOT.jar:4.17.2]
	at org.apache.bookkeeper.mledger.impl.cache.RangeEntryCacheImpl.getEstimatedEntrySize(RangeEntryCacheImpl.java:503) ~[managed-ledger-4.2.0-SNAPSHOT.jar:4.2.0-SNAPSHOT]
	at org.apache.bookkeeper.mledger.impl.cache.RangeEntryCacheImpl.asyncReadEntriesByPosition(RangeEntryCacheImpl.java:310) ~[managed-ledger-4.2.0-SNAPSHOT.jar:4.2.0-SNAPSHOT]
	at org.apache.bookkeeper.mledger.impl.cache.RangeEntryCacheImpl.asyncReadEntry0(RangeEntryCacheImpl.java:286) ~[managed-ledger-4.2.0-SNAPSHOT.jar:4.2.0-SNAPSHOT]
	at org.apache.bookkeeper.mledger.impl.cache.RangeEntryCacheImpl.asyncReadEntry(RangeEntryCacheImpl.java:268) ~[managed-ledger-4.2.0-SNAPSHOT.jar:4.2.0-SNAPSHOT]
	at org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl.asyncReadEntry(ManagedLedgerImpl.java:2438) ~[managed-ledger-4.2.0-SNAPSHOT.jar:?]
	at org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl.internalReadFromLedger(ManagedLedgerImpl.java:2408) ~[managed-ledger-4.2.0-SNAPSHOT.jar:?]
	at org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl.asyncReadEntries(ManagedLedgerImpl.java:2089) ~[managed-ledger-4.2.0-SNAPSHOT.jar:?]
	at org.apache.bookkeeper.mledger.impl.ManagedCursorImpl.asyncReadEntriesWithSkip(ManagedCursorImpl.java:918) ~[managed-ledger-4.2.0-SNAPSHOT.jar:?]
	at org.apache.bookkeeper.mledger.impl.ManagedCursorImpl.asyncReadEntriesWithSkipOrWait(ManagedCursorImpl.java:1071) ~[managed-ledger-4.2.0-SNAPSHOT.jar:?]
	at org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers.readMoreEntries(PersistentDispatcherMultipleConsumers.java:430) ~[classes/:?]
	at org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers.lambda$readMoreEntriesAsync$4(PersistentDispatcherMultipleConsumers.java:324) ~[classes/:?]

The root cause analysis shows:

  1. Thread safety issue: The entries list in PulsarMockLedgerHandle was implemented as an ArrayList, which is not thread-safe
  2. Concurrent access: Multiple threads were concurrently reading from and potentially modifying the entries list during test execution
  3. Cache invalidation: When the ConcurrentModificationException occurred, it caused all caches to be removed, making all subsequent read operations fail1
  4. Complete read failure: With the cache cleared and the test's interceptor configured to fail all BookKeeper reads (to enforce cache-only behavior), all subsequent read operations failed with NonRecoverableLedgerException: Should not read from BK since cache should be used2

Modifications

Replace ArrayList with CopyOnWriteArrayList for the entries field in PulsarMockLedgerHandle

Verifying this change

  • Make sure that the change passes the CI checks.

  • Dependencies (add or upgrade a dependency)

  • The public API

  • The schema

  • The default values of configurations

  • The threading model

  • The binary protocol

  • The REST endpoints

  • The admin CLI options

  • The metrics

  • Anything that affects deployment

Documentation

  • doc
  • doc-required
  • doc-not-needed
  • doc-complete

Matching PR in forked repository

PR in forked repository: 3pacccccc#30

@github-actions github-actions bot added the doc-not-needed Your PR changes do not impact docs label Nov 6, 2025
lhotari
lhotari previously approved these changes Nov 6, 2025
@lhotari
Copy link
Member

lhotari commented Nov 6, 2025

This change causes a test to fail:

  Error:  Tests run: 21, Failures: 1, Errors: 0, Skipped: 6, Time elapsed: 1189 s <<< FAILURE! -- in org.apache.bookkeeper.mledger.offload.jcloud.impl.BlobStoreManagedLedgerOffloaderTest
  Error:  org.apache.bookkeeper.mledger.offload.jcloud.impl.BlobStoreManagedLedgerOffloaderTest.testReadHandlerState -- Time elapsed: 60.01 s <<< FAILURE!
  org.testng.internal.thread.ThreadTimeoutException: Method org.apache.bookkeeper.mledger.offload.jcloud.impl.BlobStoreManagedLedgerOffloaderTest.testReadHandlerState() didn't finish within the time-out 60000
  	at java.base/java.util.Arrays.copyOf(Arrays.java:3482)
  	at java.base/java.util.concurrent.CopyOnWriteArrayList.add(CopyOnWriteArrayList.java:465)
  	at org.apache.bookkeeper.client.PulsarMockLedgerHandle.addEntry(PulsarMockLedgerHandle.java:162)
  	at org.apache.bookkeeper.mledger.offload.jcloud.impl.BlobStoreManagedLedgerOffloaderBase.buildReadHandle(BlobStoreManagedLedgerOffloaderBase.java:160)
  	at org.apache.bookkeeper.mledger.offload.jcloud.impl.BlobStoreManagedLedgerOffloaderTest.testReadHandlerState(BlobStoreManagedLedgerOffloaderTest.java:179)
  	at java.base/jdk.internal.reflect.DirectMethodHandleAccessor.invoke(DirectMethodHandleAccessor.java:103)
  	at java.base/java.lang.reflect.Method.invoke(Method.java:580)
  	at org.testng.internal.invokers.MethodInvocationHelper.invokeMethod(MethodInvocationHelper.java:139)
  	at org.testng.internal.invokers.InvokeMethodRunnable.runOne(InvokeMethodRunnable.java:47)
  	at org.testng.internal.invokers.InvokeMethodRunnable.call(InvokeMethodRunnable.java:76)
  	at org.testng.internal.invokers.InvokeMethodRunnable.call(InvokeMethodRunnable.java:11)
  	at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:317)
  	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1144)
  	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:642)
  	at java.base/java.lang.Thread.run(Thread.java:1583)

perhaps it adds too much overhead for that test.

@lhotari
Copy link
Member

lhotari commented Nov 6, 2025

I think we should simply use Collections.synchronizedList(new ArrayList<>()) instead, however it's possible that it introduces deadlocks.

Another optimization would be to add a counter to replace the iteration:

@Override
public long getLength() {
long length = 0;
for (LedgerEntryImpl entry : entries) {
length += entry.getLength();
}
return length;
}

@3pacccccc
Copy link
Contributor Author

I think we should simply use Collections.synchronizedList(new ArrayList<>()) instead, however it's possible that it introduces deadlocks.

Another optimization would be to add a counter to replace the iteration:

@Override
public long getLength() {
long length = 0;
for (LedgerEntryImpl entry : entries) {
length += entry.getLength();
}
return length;
}

@lhotari changed to Collections.synchronizedList(new ArrayList<>()), please take a look

@lhotari
Copy link
Member

lhotari commented Nov 7, 2025

@lhotari changed to Collections.synchronizedList(new ArrayList<>()), please take a look

It's necessary to implement

@Override
public long getLength() {
long length = 0;
for (LedgerEntryImpl entry : entries) {
length += entry.getLength();
}
return length;
}
using a counter. The ConcurrentModificationException won't get fixed with synchronized list. It's not a large change to do so.

@lhotari lhotari dismissed their stale review November 7, 2025 07:46

PulsarMockLedgerHandle.getLength would need to be implemented using a counter where the counter value is incremented when entries are added.

@3pacccccc
Copy link
Contributor Author

@lhotari changed to Collections.synchronizedList(new ArrayList<>()), please take a look

It's necessary to implement

@Override
public long getLength() {
long length = 0;
for (LedgerEntryImpl entry : entries) {
length += entry.getLength();
}
return length;
}

using a counter. The ConcurrentModificationException won't get fixed with synchronized list. It's not a large change to do so.

@lhotari sure, I'll add a counter for this

@codecov-commenter
Copy link

codecov-commenter commented Nov 7, 2025

Codecov Report

✅ All modified and coverable lines are covered by tests.
✅ Project coverage is 74.23%. Comparing base (7c343d0) to head (62ff04c).
⚠️ Report is 6 commits behind head on master.

Additional details and impacted files

Impacted file tree graph

@@             Coverage Diff              @@
##             master   #24955      +/-   ##
============================================
- Coverage     74.24%   74.23%   -0.01%     
+ Complexity    33911    33604     -307     
============================================
  Files          1913     1920       +7     
  Lines        149510   150062     +552     
  Branches      17373    17404      +31     
============================================
+ Hits         110997   111405     +408     
- Misses        29685    29772      +87     
- Partials       8828     8885      +57     
Flag Coverage Δ
inttests 26.25% <ø> (-0.04%) ⬇️
systests 22.83% <ø> (+0.11%) ⬆️
unittests 73.77% <ø> (-0.01%) ⬇️

Flags with carried forward coverage won't be shown. Click here to find out more.
see 108 files with indirect coverage changes

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.
  • 📦 JS Bundle Analysis: Save yourself from yourself by tracking and limiting bundle sizes in JS merges.

@3pacccccc 3pacccccc requested a review from lhotari November 9, 2025 14:35
@lhotari lhotari requested a review from Copilot November 9, 2025 17:02
Copy link

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull Request Overview

This PR optimizes the getLength() method in mock BookKeeper classes by maintaining a running counter instead of iterating through entries on each call. The changes also add thread-safety by wrapping the entries list in a synchronized collection.

Key changes:

  • Introduced AtomicLong totalLengthCounter to track cumulative entry lengths
  • Replaced ArrayList with Collections.synchronizedList for thread-safe entry storage
  • Updated getLength() implementations to return the counter value directly

Reviewed Changes

Copilot reviewed 3 out of 3 changed files in this pull request and generated 3 comments.

File Description
PulsarMockReadHandle.java Added totalLengthCounter parameter and updated getLength() to use the counter
PulsarMockLedgerHandle.java Introduced counter field, made entries list thread-safe, and updated entry addition logic to maintain the counter
PulsarMockBookKeeper.java Updated read handle creation to pass counter reference and added counter reset in shutdown

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

@Technoboy- Technoboy- merged commit aeb1bd1 into apache:master Nov 11, 2025
96 of 100 checks passed
@3pacccccc 3pacccccc deleted the fix-flaky-test branch November 11, 2025 07:23
lhotari pushed a commit that referenced this pull request Nov 11, 2025
…QueueReadsGettingCached (#24955)

(cherry picked from commit aeb1bd1)
lhotari pushed a commit that referenced this pull request Nov 11, 2025
…QueueReadsGettingCached (#24955)

(cherry picked from commit aeb1bd1)
lhotari pushed a commit that referenced this pull request Nov 11, 2025
…QueueReadsGettingCached (#24955)

(cherry picked from commit aeb1bd1)
lhotari pushed a commit that referenced this pull request Nov 11, 2025
…QueueReadsGettingCached (#24955)

(cherry picked from commit aeb1bd1)
nodece pushed a commit to nodece/pulsar that referenced this pull request Nov 12, 2025
manas-ctds pushed a commit to datastax/pulsar that referenced this pull request Nov 13, 2025
…QueueReadsGettingCached (apache#24955)

(cherry picked from commit aeb1bd1)
(cherry picked from commit 6d07787)
srinath-ctds pushed a commit to datastax/pulsar that referenced this pull request Nov 13, 2025
…QueueReadsGettingCached (apache#24955)

(cherry picked from commit aeb1bd1)
(cherry picked from commit 6d07787)
manas-ctds pushed a commit to datastax/pulsar that referenced this pull request Nov 13, 2025
…QueueReadsGettingCached (apache#24955)

(cherry picked from commit aeb1bd1)
(cherry picked from commit 421d646)
srinath-ctds pushed a commit to datastax/pulsar that referenced this pull request Nov 14, 2025
…QueueReadsGettingCached (apache#24955)

(cherry picked from commit aeb1bd1)
(cherry picked from commit 421d646)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Flaky-test: KeySharedSubscriptionBrokerCacheTest.testReplayQueueReadsGettingCached

4 participants