-
Notifications
You must be signed in to change notification settings - Fork 3.7k
[improve][broker] Optimize fine-grained concurrency control for BucketDelayedDeliveryTracker #24739
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
[improve][broker] Optimize fine-grained concurrency control for BucketDelayedDeliveryTracker #24739
Conversation
…rategy Refactor lock mechanism from StampedLock to ReentrantReadWriteLock for thread safety. Add async bucket snapshot creation and improve concurrent message handling.
…yncMergeBucketSnapshot
|
@codelipenghui @lhotari @coderzc @Apurva007 |
Without benchmarking, it will be hard to validate assumptions. I'd suggest adding JMH benchmarks so that performance could be compared. JMH benchmarks can be added to microbench module: https://github.com/apache/pulsar/tree/master/microbench |
|
OK. I will add JMH benchmarks first. |
…cketDelayedDeliveryTracker
microbench/src/main/java/org/apache/pulsar/broker/MockPersistentDispatcher.java
Outdated
Show resolved
Hide resolved
|
bump, this PR can be reviewed now. |
|
bump. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull request overview
This PR optimizes the concurrency control mechanism in BucketDelayedDeliveryTracker by replacing StampedLock with ReentrantReadWriteLock and introducing asynchronous bucket snapshot creation. The changes aim to reduce lock contention, eliminate blocking I/O from the publish path, and improve thread safety.
Key Changes
- Concurrency improvements: Replaced
StampedLockwithReentrantReadWriteLockfor read/write separation, and implemented a "read-lock check → write-lock modify" pattern inaddMessage() - Asynchronous operations: Bucket snapshots are now created on a dedicated single-thread executor (
bucketSnapshotExecutor), preventing blocking I/O in critical paths - Decoupling: Introduced
DelayedDeliveryContextinterface withDispatcherDelayedDeliveryContextandNoopDelayedDeliveryContextimplementations to decouple trackers from dispatchers for easier testing and benchmarking
Reviewed changes
Copilot reviewed 15 out of 15 changed files in this pull request and generated 9 comments.
Show a summary per file
| File | Description |
|---|---|
BucketDelayedDeliveryTracker.java |
Core concurrency changes: replaced StampedLock with ReentrantReadWriteLock, added async snapshot executor, implemented seal-and-swap pattern for bucket creation |
AbstractDelayedDeliveryTracker.java |
Added timerStateLock for timer operations, introduced scheduleImmediateRun(), replaced dispatcher references with DelayedDeliveryContext |
DelayedDeliveryContext.java |
New interface defining context operations (getName, getCursor, triggerReadMoreEntries) |
DispatcherDelayedDeliveryContext.java |
Production implementation wrapping dispatcher with synchronized triggerReadMoreEntries() |
NoopDelayedDeliveryContext.java |
Test/benchmark implementation with no-op trigger method |
BucketDelayedDeliveryTrackerThreadSafetyTest.java |
Comprehensive thread safety tests covering concurrent reads/writes, deadlock detection, clear/close operations, and mixed workloads |
BucketDelayedDeliveryTrackerBenchmark.java |
New JMH benchmarks for testing ReentrantReadWriteLock performance under various read/write ratios and contention levels |
MockBucketSnapshotStorage.java |
Mock storage implementation for benchmarking without I/O overhead |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
You can also share your feedback on Copilot code review for a chance to win a $100 gift card. Take the survey.
...oker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BucketDelayedDeliveryTracker.java
Outdated
Show resolved
Hide resolved
...oker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BucketDelayedDeliveryTracker.java
Show resolved
Hide resolved
...oker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BucketDelayedDeliveryTracker.java
Show resolved
Hide resolved
...oker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BucketDelayedDeliveryTracker.java
Outdated
Show resolved
Hide resolved
...va/org/apache/pulsar/broker/delayed/bucket/BucketDelayedDeliveryTrackerThreadSafetyTest.java
Outdated
Show resolved
Hide resolved
...oker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BucketDelayedDeliveryTracker.java
Outdated
Show resolved
Hide resolved
...oker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BucketDelayedDeliveryTracker.java
Outdated
Show resolved
Hide resolved
microbench/src/main/java/org/apache/pulsar/broker/package-info.java
Outdated
Show resolved
Hide resolved
...oker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BucketDelayedDeliveryTracker.java
Outdated
Show resolved
Hide resolved
|
@BewareMyPower Copilot's review made sense, I re-examined it and have made the changes. PTAL. |
…ssage addition in benchmarks
…rameters for memory management
|
@codelipenghui @lhotari @coderzc @Apurva007 @BewareMyPower @thetumbled @dao-jun Update the JMH test results. java -Xmx4g -jar microbench/target/microbenchmarks.jar \
-p maxAdditionalUniqueMessages=5000000 \
-rf json -rff jmh-result-$(date +%s).json \
".*BucketDelayedDeliveryTrackerBenchmark.benchmarkConcurrentAddMessage.*" \
| tee jmh-result-$(date +%s).txt
|
|
In the current However, delayed-message workloads are not “pure read” – they are a mix of:
So I focused on the mixed-operation benchmarks, which better reflect real usage. From the JMH results:
So the trade-off is:
https://gist.github.com/Denovo1998/d470ea939fc9337523545525539b8f5f
https://gist.github.com/Denovo1998/02075b374a5f00771ac6d32809ac4036
|
…or shutdown process




Fixes #24603
Main Issue: #24600
PIP: #xyz
Motivation
Modifications
Verifying this change
(Please pick either of the following options)
This change is a trivial rework / code cleanup without any test coverage.
(or)
This change is already covered by existing tests, such as (please describe tests).
(or)
This change added tests and can be verified as follows:
(example:)
Does this pull request potentially affect one of the following parts:
If the box was checked, please highlight the changes
Documentation
docdoc-requireddoc-not-neededdoc-completeMatching PR in forked repository
PR in forked repository: Denovo1998#10