[ISSUE #1737]💥Implement QueueLockManager function🚀#1738
[ISSUE #1737]💥Implement QueueLockManager function🚀#1738rocketmq-rust-bot merged 3 commits intomainfrom
Conversation
WalkthroughThe changes introduce a new structure called Changes
Assessment against linked issues
Possibly related PRs
Suggested labels
Suggested reviewers
Poem
Thank you for using CodeRabbit. We offer it for free to the OSS community and would appreciate your support in helping us grow. If you find it useful, would you consider giving us a shout-out on your favorite social media? 🪧 TipsChatThere are 3 ways to chat with CodeRabbit:
Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments. CodeRabbit Commands (Invoked using PR comments)
Other keywords and placeholders
Documentation and Community
|
|
🔊@mxsm 🚀Thanks for your contribution 🎉. CodeRabbit(AI) will review your code first 🔥 |
There was a problem hiding this comment.
Actionable comments posted: 3
🧹 Outside diff range and nitpick comments (4)
rocketmq-broker/src/processor/pop_message_processor.rs (4)
150-152: Consider renamingexpired_local_cachetolock_cachefor clarityThe field
expired_local_cacheinQueueLockManagermay not clearly represent its purpose. Since it stores all active locks (not just expired ones), consider renaming it tolock_cacheorlock_mapto improve code readability.
161-174: Optimizebuild_lock_keyto returnCheetahStringdirectlyCurrently,
build_lock_keyreturns aString, which is then converted to aCheetahStringin thetry_lockandunlockmethods. To avoid unnecessary conversions and improve performance, consider modifyingbuild_lock_keyto return aCheetahStringdirectly.Apply this diff to modify the function:
pub fn build_lock_key( topic: &CheetahString, consumer_group: &CheetahString, queue_id: i32, ) -> CheetahString { - format!( + CheetahString::from_string(format!( "{}{}{}{}{}", topic, PopAckConstants::SPLIT, consumer_group, PopAckConstants::SPLIT, queue_id - ) + )) }
224-225: Update log output to reflect actual lock count after cleanupAfter modifying
clean_unused_locksto return the number of remaining locks, adjust the log statement to accurately report the current lock cache size.Apply this diff to update the log:
let count = self.clean_unused_locks(60000).await; -info!("QueueLockSize={}", count); +info!("QueueLockSize after cleanup={}", count);
150-229: Add documentation comments to public methodsAdding documentation comments to the public methods of
QueueLockManagerwill enhance code readability and maintainability, helping other developers understand their purpose and usage.
| let mut cache = self.expired_local_cache.lock().await; | ||
| let lock = cache.entry(key).or_insert(TimedLock::new()); | ||
| lock.try_lock() | ||
| } |
There was a problem hiding this comment.
🛠️ Refactor suggestion
Consider using a concurrent DashMap to reduce contention
Using a Mutex to guard the entire expired_local_cache may cause contention in high-concurrency scenarios. Consider replacing it with a thread-safe concurrent map like DashMap to improve performance.
Apply these changes to use DashMap:
-
Add the dependency in
Cargo.toml:[dependencies] dashmap = "5.3"
-
Update the import statement:
-use tokio::sync::Mutex; +use dashmap::DashMap;
-
Modify the
QueueLockManagerstruct:pub struct QueueLockManager { - expired_local_cache: Arc<Mutex<HashMap<CheetahString, TimedLock>>>, + expired_local_cache: Arc<DashMap<CheetahString, TimedLock>>, } -
Update methods to use
DashMapAPIs without locking:-let mut cache = self.expired_local_cache.lock().await; -let lock = cache.entry(key).or_insert(TimedLock::new()); +let lock = self.expired_local_cache.entry(key).or_insert_with(TimedLock::new);
Adjust other methods similarly to eliminate explicit locking.
| pub async fn clean_unused_locks(&self, used_expire_millis: u64) -> i32 { | ||
| let mut cache = self.expired_local_cache.lock().await; | ||
| let mut count = 0; | ||
| cache.retain(|_, lock| { | ||
| count += 1; | ||
| get_current_millis() - lock.get_lock_time() <= used_expire_millis | ||
| }); | ||
| count | ||
| } |
There was a problem hiding this comment.
Fix clean_unused_locks count to reflect remaining locks
In the clean_unused_locks method, the count variable represents the total number of processed locks, not the number of remaining locks after cleanup. This can be misleading in logs and tests. Modify the function to return the number of remaining locks instead.
Apply this diff to fix the issue:
pub async fn clean_unused_locks(&self, used_expire_millis: u64) -> i32 {
let mut cache = self.expired_local_cache.lock().await;
- let mut count = 0;
cache.retain(|_, lock| {
- count += 1;
get_current_millis() - lock.get_lock_time() <= used_expire_millis
});
- count
+ cache.len() as i32
}📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| pub async fn clean_unused_locks(&self, used_expire_millis: u64) -> i32 { | |
| let mut cache = self.expired_local_cache.lock().await; | |
| let mut count = 0; | |
| cache.retain(|_, lock| { | |
| count += 1; | |
| get_current_millis() - lock.get_lock_time() <= used_expire_millis | |
| }); | |
| count | |
| } | |
| pub async fn clean_unused_locks(&self, used_expire_millis: u64) -> i32 { | |
| let mut cache = self.expired_local_cache.lock().await; | |
| cache.retain(|_, lock| { | |
| get_current_millis() - lock.get_lock_time() <= used_expire_millis | |
| }); | |
| cache.len() as i32 | |
| } |
Codecov ReportAttention: Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## main #1738 +/- ##
==========================================
+ Coverage 28.11% 28.22% +0.11%
==========================================
Files 466 466
Lines 64277 64388 +111
==========================================
+ Hits 18071 18175 +104
- Misses 46206 46213 +7 ☔ View full report in Codecov by Sentry. |
Co-authored-by: coderabbitai[bot] <136622811+coderabbitai[bot]@users.noreply.github.com>
|
🔊@mxsm 🚀Thanks for your contribution 🎉. CodeRabbit(AI) will review your code first 🔥 |
There was a problem hiding this comment.
Actionable comments posted: 0
🧹 Outside diff range and nitpick comments (2)
rocketmq-broker/src/processor/pop_message_processor.rs (2)
220-228: Make cleanup interval and expiry time configurableThe background task uses hardcoded values for:
- Cleanup interval: 60 seconds
- Lock expiry time: 60000 milliseconds
Consider making these values configurable through the
QueueLockManagerconstructor or environment variables.pub struct QueueLockManager { expired_local_cache: Arc<Mutex<HashMap<CheetahString, TimedLock>>>, + cleanup_interval_secs: u64, + lock_expiry_millis: u64, } impl QueueLockManager { - pub fn new() -> Self { + pub fn new(cleanup_interval_secs: Option<u64>, lock_expiry_millis: Option<u64>) -> Self { QueueLockManager { expired_local_cache: Arc::new(Mutex::new(HashMap::with_capacity(4096))), + cleanup_interval_secs: cleanup_interval_secs.unwrap_or(60), + lock_expiry_millis: lock_expiry_millis.unwrap_or(60000), } } pub fn start(self: Arc<Self>) { tokio::spawn(async move { loop { - tokio::time::sleep(tokio::time::Duration::from_secs(60)).await; - let count = self.clean_unused_locks(60000).await; + tokio::time::sleep(tokio::time::Duration::from_secs(self.cleanup_interval_secs)).await; + let count = self.clean_unused_locks(self.lock_expiry_millis).await; info!("QueueLockSize={}", count); } }); } }
331-390: Improve test coverage and naming conventions
Test naming:
- Remove inconsistent "1" suffixes (e.g.,
try_lock_locks_successfully1).- Use descriptive names that indicate the scenario being tested.
Missing test cases:
- Concurrent access scenarios
- Edge cases for expired locks
- Cleanup task behavior
Example of a concurrent test:
#[tokio::test] async fn concurrent_lock_attempts_are_thread_safe() { let manager = Arc::new(QueueLockManager::new()); let topic = CheetahString::from_static_str("test_topic"); let consumer_group = CheetahString::from_static_str("test_group"); let queue_id = 1; let mut handles = vec![]; for _ in 0..10 { let manager = manager.clone(); let topic = topic.clone(); let consumer_group = consumer_group.clone(); handles.push(tokio::spawn(async move { manager.try_lock(&topic, &consumer_group, queue_id).await })); } let results = futures::future::join_all(handles).await; let successful_locks = results.into_iter().filter(|r| r.unwrap()).count(); assert_eq!(successful_locks, 1); }
📜 Review details
Configuration used: .coderabbit.yaml
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (1)
rocketmq-broker/src/processor/pop_message_processor.rs(4 hunks)
🔇 Additional comments (4)
rocketmq-broker/src/processor/pop_message_processor.rs (4)
150-158: 🛠️ Refactor suggestion
Consider using DashMap and documenting capacity choice
As mentioned in previous reviews, using DashMap instead of Arc<Mutex<HashMap>> would provide better performance in concurrent scenarios by reducing lock contention. Additionally, please document why 4096 was chosen as the initial capacity.
210-218:
Fix count to reflect remaining locks
As mentioned in previous reviews, the count should reflect the number of remaining locks after cleanup, not the number of processed locks.
220-228: 🛠️ Refactor suggestion
Add error handling for background cleanup task
The background task should handle and log any errors that occur during cleanup to prevent silent failures.
pub fn start(self: Arc<Self>) {
tokio::spawn(async move {
loop {
tokio::time::sleep(tokio::time::Duration::from_secs(60)).await;
- let count = self.clean_unused_locks(60000).await;
- info!("QueueLockSize={}", count);
+ match self.clean_unused_locks(60000).await {
+ Ok(count) => info!("QueueLockSize={}", count),
+ Err(e) => error!("Failed to clean unused locks: {}", e),
+ }
}
});
}Likely invalid or redundant comment.
187-191: 🛠️ Refactor suggestion
Optimize mutex guard duration
The mutex guard is held for the entire duration of the function. Consider releasing it earlier by moving the try_lock() call outside the guard:
pub async fn try_lock_with_key(&self, key: CheetahString) -> bool {
- let mut cache = self.expired_local_cache.lock().await;
- let lock = cache.entry(key).or_insert(TimedLock::new());
- lock.try_lock()
+ let lock = {
+ let mut cache = self.expired_local_cache.lock().await;
+ cache.entry(key).or_insert(TimedLock::new()).clone()
+ };
+ lock.try_lock()
}Likely invalid or redundant comment.
|
🔊@mxsm 🚀Thanks for your contribution 🎉. CodeRabbit(AI) will review your code first 🔥 |
There was a problem hiding this comment.
Actionable comments posted: 0
🧹 Outside diff range and nitpick comments (4)
rocketmq-broker/src/processor/pop_message_processor.rs (4)
156-158: Consider making initial capacity configurableThe fixed capacity of 4096 might not be suitable for all use cases. Consider making it configurable through a constructor parameter.
- pub fn new() -> Self { + pub fn new(initial_capacity: usize) -> Self { QueueLockManager { - expired_local_cache: Arc::new(Mutex::new(HashMap::with_capacity(4096))), + expired_local_cache: Arc::new(Mutex::new(HashMap::with_capacity(initial_capacity))), } }
161-174: Optimize string key generationThe current implementation creates intermediate String allocations. Consider using a pre-allocated buffer or string builder for better performance.
pub fn build_lock_key( topic: &CheetahString, consumer_group: &CheetahString, queue_id: i32, - ) -> String { - format!( - "{}{}{}{}{}", - topic, - PopAckConstants::SPLIT, - consumer_group, - PopAckConstants::SPLIT, - queue_id - ) + ) -> CheetahString { + let mut buffer = String::with_capacity( + topic.len() + consumer_group.len() + 10 + ); + buffer.push_str(topic.as_str()); + buffer.push_str(PopAckConstants::SPLIT); + buffer.push_str(consumer_group.as_str()); + buffer.push_str(PopAckConstants::SPLIT); + buffer.push_str(&queue_id.to_string()); + CheetahString::from(buffer) }
217-225: Make cleanup interval configurableThe background task's interval is hardcoded to 60 seconds. Consider making it configurable to allow for different cleanup strategies.
- pub fn start(self: Arc<Self>) { + pub fn start(self: Arc<Self>, cleanup_interval_secs: u64) { tokio::spawn(async move { loop { - tokio::time::sleep(tokio::time::Duration::from_secs(60)).await; + tokio::time::sleep(tokio::time::Duration::from_secs(cleanup_interval_secs)).await; let count = self.clean_unused_locks(60000).await; info!("QueueLockSize={}", count); } }); }
345-362: Improve test naming consistency and coverage
- Test names have inconsistent suffixes (e.g.,
try_lock_locks_successfully1).- Missing tests for concurrent access scenarios.
Consider:
- Removing numeric suffixes from test names
- Adding concurrent access tests:
#[tokio::test] async fn try_lock_handles_concurrent_access() { let manager = Arc::new(QueueLockManager::new()); let topic = CheetahString::from_static_str("test_topic"); let consumer_group = CheetahString::from_static_str("test_group"); let queue_id = 1; let mut handles = vec![]; for _ in 0..10 { let manager = manager.clone(); let topic = topic.clone(); let consumer_group = consumer_group.clone(); handles.push(tokio::spawn(async move { manager.try_lock(&topic, &consumer_group, queue_id).await })); } let results = futures::future::join_all(handles).await; assert_eq!(results.into_iter().filter(|r| r.unwrap()).count(), 1); }
📜 Review details
Configuration used: .coderabbit.yaml
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (1)
rocketmq-broker/src/processor/pop_message_processor.rs(4 hunks)
🔇 Additional comments (4)
rocketmq-broker/src/processor/pop_message_processor.rs (4)
17-17: 🛠️ Refactor suggestion
Consider using DashMap for better concurrency
Using a Mutex to guard the entire HashMap may cause contention in high-concurrency scenarios. Consider using dashmap::DashMap instead.
Also applies to: 21-21, 33-33
210-215:
Fix lock cleanup count logic
The method returns the total count before cleanup instead of the number of remaining locks.
pub async fn clean_unused_locks(&self, used_expire_millis: u64) -> usize {
let mut cache = self.expired_local_cache.lock().await;
- let count = cache.len();
cache.retain(|_, lock| get_current_millis() - lock.get_lock_time() <= used_expire_millis);
- count
+ cache.len()
}376-387:
Update test assertions for clean_unused_locks
The test assertions need to be updated to match the fixed cleanup count logic.
187-191: Verify lock acquisition patterns
Let's verify that there are no potential deadlocks or race conditions in lock usage patterns across the codebase.
✅ Verification successful
Based on the comprehensive search results, I can now generate the final response:
Lock implementation follows safe patterns
The lock implementation in try_lock_with_key follows safe patterns with:
- Proper lock acquisition using tokio's async mutex
- No nested locks that could cause deadlocks
- Clear unlock paths in corresponding
unlock_with_keymethod - Comprehensive test coverage validating lock behavior
- Timeout mechanisms to prevent indefinite blocking
The codebase shows consistent lock/unlock patterns across different components with proper error handling and timeouts.
🏁 Scripts executed
The following scripts were executed for the analysis:
Script:
#!/bin/bash
# Search for all lock acquisitions and releases
echo "Searching for lock acquisition patterns..."
rg -A 5 "try_lock|unlock" --type rust
# Search for potential nested locks
echo "Checking for potential nested locks..."
ast-grep --pattern 'try_lock($_) {
$$$
try_lock($_)
$$$
}'
Length of output: 62867
Which Issue(s) This PR Fixes(Closes)
Fixes #1737
Brief Description
How Did You Test This Change?
Summary by CodeRabbit
New Features
QueueLockManagerfor improved message processing concurrency control.Bug Fixes
Tests
QueueLockManagerto verify lock functionality and cache behavior.