Conversation
WalkthroughThe pull request introduces significant modifications to the RocketMQ client, focusing on enhancing asynchronous capabilities across various components. Key changes include updating the Changes
Assessment against linked issues
Possibly related PRs
Suggested labels
Suggested reviewers
Poem
Thank you for using CodeRabbit. We offer it for free to the OSS community and would appreciate your support in helping us grow. If you find it useful, would you consider giving us a shout-out on your favorite social media? 🪧 TipsChatThere are 3 ways to chat with CodeRabbit:
Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments. CodeRabbit Commands (Invoked using PR comments)
Other keywords and placeholders
Documentation and Community
|
|
🚀Thanks for your contribution🎉. CodeRabbit(AI) will review your code first🔥 |
There was a problem hiding this comment.
Actionable comments posted: 7
🧹 Outside diff range and nitpick comments (5)
rocketmq-remoting/src/protocol/route/topic_route_data.rs (1)
45-51: Approved: Improved functionality oftopic_route_data_changedThe changes correctly implement the comparison logic for detecting changes in topic route data. By sorting both the current and old data before comparison, we ensure a consistent and accurate result.
Consider a minor optimization: move the sorting operations to a separate method to improve readability and reusability. For example:
impl TopicRouteData { fn sort_data(&mut self) { self.queue_datas.sort(); self.broker_datas.sort(); } pub fn topic_route_data_changed(&self, old_data: Option<&TopicRouteData>) -> bool { if old_data.is_none() { return true; } let mut now = TopicRouteData::from_existing(self); let mut old = TopicRouteData::from_existing(old_data.unwrap()); now.sort_data(); old.sort_data(); now != old } }This refactoring would make the
topic_route_data_changedmethod cleaner and allow reuse of the sorting logic if needed elsewhere.rocketmq-remoting/src/protocol/route/route_data_view.rs (1)
Line range hint
18-49: Summary: Ordering capabilities added toBrokerDataThe changes introduce
PartialOrdandOrdtrait implementations forBrokerData, enabling comparison and sorting based on thebroker_namefield. This enhancement allowsBrokerDatato be used in ordered collections and sorting operations.While the implementations are correct, it's important to consider the following:
- The choice of
broker_nameas the ordering key may affect howBrokerDatais used throughout the system.- Existing code that works with collections of
BrokerDatamight behave differently now that ordering is defined.To ensure these changes align with the overall system design:
- Review any code that sorts or compares
BrokerDatainstances to confirm that ordering bybroker_nameis appropriate.- Consider documenting this new behavior in the struct's documentation to inform other developers of the ordering semantics.
- If alternative ordering schemes are needed in some contexts, consider implementing custom comparison functions or using wrapper types with different
Ordimplementations.rocketmq-client/src/producer/default_mq_producer.rs (1)
255-255: LGTM: Good use of the builder pattern innew().The update to the
new()method to use the builder pattern is a good improvement. It simplifies the creation of a newDefaultMQProducerinstance and is consistent with modern Rust practices.Consider adding a
#[inline]attribute to this method as well, similar to thebuilder()method. This could potentially improve performance for frequent instantiations.rocketmq-client/src/latency/latency_fault_tolerance_impl.rs (1)
25-32: DeriveCloneforLatencyFaultToleranceImpl<R, S>if cloneability is requiredThe struct
LatencyFaultToleranceImpl<R, S>contains fields likeOption<R>andOption<S>. If instances of this struct need to be cloned elsewhere in your code, consider derivingClone:#[derive(Clone)] pub struct LatencyFaultToleranceImpl<R, S> { // ... existing fields ... }This makes it easier to work with the struct when cloning is necessary.
rocketmq-client/src/producer/producer_impl/default_mq_producer_impl.rs (1)
Line range hint
828-837: Ensure semaphore permits are released after the async task completesCurrently, semaphore permits are dropped immediately after spawning the async task in
execute_async_message_send. This may lead to incorrect backpressure handling because the permits should be held until the async task completes. Consider moving the permit release into the async task so that the permits are released after the task finishes.Apply this diff to fix the permit handling:
- self.get_async_sender_executor().get_handle().spawn(f); - drop((acquire_value_num, acquire_value_size)); + self.get_async_sender_executor().get_handle().spawn(async move { + let result = f.await; + drop((acquire_value_num, acquire_value_size)); + result + });
📜 Review details
Configuration used: .coderabbit.yaml
Review profile: CHILL
📒 Files selected for processing (9)
- rocketmq-client/src/implementation/mq_client_api_impl.rs (2 hunks)
- rocketmq-client/src/latency/latency_fault_tolerance.rs (5 hunks)
- rocketmq-client/src/latency/latency_fault_tolerance_impl.rs (5 hunks)
- rocketmq-client/src/latency/mq_fault_strategy.rs (7 hunks)
- rocketmq-client/src/latency/resolver.rs (1 hunks)
- rocketmq-client/src/producer/default_mq_producer.rs (1 hunks)
- rocketmq-client/src/producer/producer_impl/default_mq_producer_impl.rs (11 hunks)
- rocketmq-remoting/src/protocol/route/route_data_view.rs (2 hunks)
- rocketmq-remoting/src/protocol/route/topic_route_data.rs (1 hunks)
🧰 Additional context used
🔇 Additional comments (10)
rocketmq-remoting/src/protocol/route/route_data_view.rs (3)
18-18: LGTM: Necessary import added.The
Orderingimport is correctly added and is required for the new trait implementations.
39-43: LGTM: Correct implementation ofPartialOrd.The
PartialOrdimplementation forBrokerDatais correct and follows Rust's best practices by delegating to theOrdimplementation. This ensures consistency between partial and total ordering.
45-49: LGTM: Correct implementation ofOrd. Verify ordering choice.The
Ordimplementation forBrokerDatais correct and provides a total ordering based on thebroker_namefield. This allows for sorting and usingBrokerDatain ordered collections.Please confirm that ordering
BrokerDatabybroker_nameis the intended behavior for all use cases. If there are scenarios where a different ordering might be needed, consider implementing a customOrdtrait or using a separate comparison function.To verify the usage of this ordering, you can run the following script:
This will help identify any potential impacts of the new ordering implementation on existing code.
rocketmq-client/src/producer/default_mq_producer.rs (2)
250-253: LGTM: Good addition of a builder method.The introduction of the
builder()method is a positive change. It promotes the use of the builder pattern, which is beneficial for creating complex objects likeDefaultMQProducer. The#[inline]attribute is also a good optimization hint for the compiler.
250-255: Verify usage ofDefaultMQProducerin the codebase.The introduction of the builder pattern for
DefaultMQProduceris a good improvement. However, it's important to ensure that all existing instantiations ofDefaultMQProducerin the codebase are updated to use the new builder pattern if necessary.Run the following script to check for direct instantiations of
DefaultMQProducerthat might need to be updated:✅ Verification successful
DefaultMQProducer usage is properly updated.
All instantiations ofDefaultMQProducerutilize the new builder pattern, and no directnew()calls were found elsewhere in the codebase.🏁 Scripts executed
The following scripts were executed for the analysis:
Script:
#!/bin/bash # Description: Find direct instantiations of DefaultMQProducer that might need updating # Search for DefaultMQProducer::new() calls echo "Searching for DefaultMQProducer::new() calls:" rg "DefaultMQProducer::new\(\)" --type rust # Search for DefaultMQProducer { .. } struct instantiations echo -e "\nSearching for DefaultMQProducer { .. } struct instantiations:" rg "DefaultMQProducer\s*\{" --type rustLength of output: 761
rocketmq-client/src/latency/resolver.rs (1)
19-20: Introduction ofResolverLocaltrait with asyncresolvemethodThe new
ResolverLocaltrait with an asynchronousresolvemethod aligns with the shift towards non-blocking operations in the codebase. This change enhances the system's ability to perform asynchronous resolution, improving overall concurrency and responsiveness.Run the following script to ensure all implementations of
ResolverLocalcorrectly handle the asyncresolvemethod:rocketmq-client/src/latency/latency_fault_tolerance.rs (1)
74-74:⚠️ Potential issueEnsure proper handling of
Option<T>inpick_one_at_leastThe method
pick_one_at_leastnow returnsOption<T>instead ofT. Please verify that all implementations and call sites of this method have been updated to handle theNonecase appropriately to prevent potential runtime errors.Run the following script to find usages of
pick_one_at_leastand check for proper handling:✅ Verification successful
No usages of
pick_one_at_leastfound. Verification successful.🏁 Scripts executed
The following scripts were executed for the analysis:
Script:
#!/bin/bash # Description: Find all usages of `pick_one_at_least` to ensure `Option<T>` is handled correctly. # Search for method invocations rg 'pick_one_at_least\(' -A 3 -B 2Length of output: 1159
Script:
#!/bin/bash # Description: Use ast-grep to find all call sites of `pick_one_at_least` and check for proper `Option<T>` handling. # Search for function calls and surrounding handling ast-grep --pattern ' match $$$ { Some($_) => { $$$ }, None => { $$$ }, } ' --lang rustLength of output: 96
rocketmq-client/src/implementation/mq_client_api_impl.rs (3)
460-462: Correctly awaiting the asynchronousupdate_fault_itemmethodThe
update_fault_itemmethod is now asynchronous, and it's appropriate toawaitit to ensure fault items are updated correctly.
475-477: Duplicate comment: Correctly awaitingupdate_fault_itemAs previously mentioned, awaiting the asynchronous
update_fault_itemmethod is necessary here as well.
481-483: Duplicate comment: Correctly awaitingupdate_fault_itemAs previously noted, awaiting the asynchronous
update_fault_itemmethod ensures proper error handling and fault item updates.
| fn as_any(&self) -> &dyn Any; | ||
|
|
||
| fn set_service_detector(&mut self, service_detector: Box<dyn ServiceDetector>); | ||
| fn as_any_mut(&mut self) -> &mut dyn Any; |
There was a problem hiding this comment.
🛠️ Refactor suggestion
Reconsider the use of as_any and as_any_mut for downcasting
Introducing as_any and as_any_mut methods to enable downcasting can lead to fragile code and may indicate that the trait's abstraction is leaking. Consider redesigning the trait or its implementations to eliminate the need for downcasting. Alternative approaches include:
- Using associated types or generic parameters to retain type information.
- Defining additional trait methods that expose the necessary functionality without requiring downcasting.
- Employing an enum to represent different concrete types if the set of possible types is known and finite.
| pub trait LatencyFaultTolerance<T>: Send + Sync + 'static { | ||
| use rocketmq_common::ArcRefCellWrapper; | ||
|
|
||
| #[allow(async_fn_in_trait)] |
There was a problem hiding this comment.
Consider using async_trait instead of #[allow(async_fn_in_trait)]
Using #[allow(async_fn_in_trait)] permits the use of async functions in traits but relies on an unstable feature that requires nightly Rust. To ensure compatibility with stable Rust, consider using the async_trait crate, which provides a macro to enable async functions in traits safely.
Apply this diff to use async_trait:
+use async_trait::async_trait;
-#[allow(async_fn_in_trait)]
-pub trait LatencyFaultTolerance<T, R, S>: Send + Sync + 'static {
+#[async_trait]
+pub trait LatencyFaultTolerance<T, R, S>: Send + Sync + 'static {Committable suggestion was skipped due to low confidence.
| let mut flag = false; | ||
| let handle = Handle::current(); | ||
| scope(|s| { | ||
| s.spawn(|| { | ||
| flag = handle.block_on(async { | ||
| self.latency_fault_tolerance | ||
| .is_reachable(&message_queue.get_broker_name().to_string()) | ||
| .await | ||
| }); | ||
| }); | ||
| }); | ||
| flag |
There was a problem hiding this comment.
Data race due to unsynchronized mutation of flag in ReachableFilter::filter
The variable flag is mutated inside a spawned thread without synchronization, which can lead to data races and undefined behavior.
Consider removing the thread and directly blocking on the async function to avoid the data race. Apply this diff to fix the issue:
- let mut flag = false;
- let handle = Handle::current();
- scope(|s| {
- s.spawn(|| {
- flag = handle.block_on(async {
- self.latency_fault_tolerance
- .is_reachable(&message_queue.get_broker_name().to_string())
- .await
- });
- });
- });
- flag
+ let handle = Handle::current();
+ let flag = handle.block_on(async {
+ self.latency_fault_tolerance
+ .is_reachable(&message_queue.get_broker_name().to_string())
+ .await
+ });
+ flag📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| let mut flag = false; | |
| let handle = Handle::current(); | |
| scope(|s| { | |
| s.spawn(|| { | |
| flag = handle.block_on(async { | |
| self.latency_fault_tolerance | |
| .is_reachable(&message_queue.get_broker_name().to_string()) | |
| .await | |
| }); | |
| }); | |
| }); | |
| flag | |
| let handle = Handle::current(); | |
| let flag = handle.block_on(async { | |
| self.latency_fault_tolerance | |
| .is_reachable(&message_queue.get_broker_name().to_string()) | |
| .await | |
| }); | |
| flag |
| let mut flag = false; | ||
| let handle = Handle::current(); | ||
| scope(|s| { | ||
| s.spawn(|| { | ||
| flag = handle.block_on(async { | ||
| self.latency_fault_tolerance | ||
| .is_available(&message_queue.get_broker_name().to_string()) | ||
| .await | ||
| }); | ||
| }); | ||
| }); | ||
| flag |
There was a problem hiding this comment.
Data race due to unsynchronized mutation of flag in AvailableFilter::filter
Similar to the previous issue, the variable flag is mutated inside a spawned thread without proper synchronization, causing potential data races.
Apply the same fix by removing the thread and directly blocking on the async function:
- let mut flag = false;
- let handle = Handle::current();
- scope(|s| {
- s.spawn(|| {
- flag = handle.block_on(async {
- self.latency_fault_tolerance
- .is_available(&message_queue.get_broker_name().to_string())
- .await
- });
- });
- });
- flag
+ let handle = Handle::current();
+ let flag = handle.block_on(async {
+ self.latency_fault_tolerance
+ .is_available(&message_queue.get_broker_name().to_string())
+ .await
+ });
+ flag📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| let mut flag = false; | |
| let handle = Handle::current(); | |
| scope(|s| { | |
| s.spawn(|| { | |
| flag = handle.block_on(async { | |
| self.latency_fault_tolerance | |
| .is_available(&message_queue.get_broker_name().to_string()) | |
| .await | |
| }); | |
| }); | |
| }); | |
| flag | |
| let handle = Handle::current(); | |
| let flag = handle.block_on(async { | |
| self.latency_fault_tolerance | |
| .is_available(&message_queue.get_broker_name().to_string()) | |
| .await | |
| }); | |
| flag |
| let mut fault_item_table = self.fault_item_table.lock().await; | ||
| let mut remove_set = HashSet::new(); | ||
| for (name, fault_item) in fault_item_table.iter() { | ||
| if get_current_millis() as i64 | ||
| - (fault_item | ||
| .check_stamp | ||
| .load(std::sync::atomic::Ordering::Relaxed) as i64) | ||
| < 0 | ||
| { | ||
| continue; | ||
| } | ||
| fault_item.check_stamp.store( | ||
| get_current_millis() + self.detect_interval as u64, | ||
| std::sync::atomic::Ordering::Release, | ||
| ); | ||
| let broker_addr = self | ||
| .resolver | ||
| .as_ref() | ||
| .unwrap() | ||
| .resolve(fault_item.name.as_str()) | ||
| .await; | ||
| if broker_addr.is_none() { | ||
| remove_set.insert(name.clone()); | ||
| continue; | ||
| } | ||
| if self.service_detector.is_none() { | ||
| continue; | ||
| } | ||
| let service_ok = self | ||
| .service_detector | ||
| .as_ref() | ||
| .unwrap() | ||
| .detect(broker_addr.unwrap().as_str(), self.detect_timeout as u64); | ||
| if service_ok | ||
| && fault_item | ||
| .reachable_flag | ||
| .load(std::sync::atomic::Ordering::Acquire) | ||
| { | ||
| info!("{} is reachable now, then it can be used.", name); |
There was a problem hiding this comment.
Avoid holding the fault_item_table lock across await points to prevent potential deadlocks
In the detect_by_one_round method, you're holding the fault_item_table mutex lock while performing asynchronous operations like resolver.resolve().await. Holding an asynchronous mutex across await points can lead to reduced concurrency or potential deadlocks because other tasks cannot access the locked resource until the lock is released.
To address this, consider collecting the necessary data while holding the lock and then release it before performing any asynchronous operations. Here's how you might adjust the code:
async fn detect_by_one_round(&self) {
let fault_items: Vec<(String, FaultItem)> = {
let fault_item_table = self.fault_item_table.lock().await;
fault_item_table.iter().map(|(name, item)| (name.clone(), item.clone())).collect()
};
let mut remove_set = HashSet::new();
for (name, fault_item) in fault_items {
if get_current_millis() as i64 - (fault_item.check_stamp.load(Ordering::Relaxed) as i64) < 0 {
continue;
}
fault_item.check_stamp.store(
get_current_millis() + self.detect_interval as u64,
Ordering::Release,
);
let broker_addr = match self.resolver.as_ref() {
Some(resolver) => resolver.resolve(fault_item.name.as_str()).await,
None => {
remove_set.insert(name.clone());
continue;
}
};
if broker_addr.is_none() {
remove_set.insert(name.clone());
continue;
}
if let Some(service_detector) = self.service_detector.as_ref() {
let service_ok = service_detector
.detect(broker_addr.unwrap().as_str(), self.detect_timeout as u64);
if service_ok && fault_item.reachable_flag.load(Ordering::Acquire) {
info!("{} is reachable now, then it can be used.", name);
fault_item.reachable_flag.store(true, Ordering::Release);
}
}
}
let mut fault_item_table = self.fault_item_table.lock().await;
for name in remove_set {
fault_item_table.remove(&name);
}
}This modification collects the necessary data upfront and releases the lock before the asynchronous calls, enhancing concurrency and preventing potential deadlocks.
| let broker_addr = self | ||
| .resolver | ||
| .as_ref() | ||
| .unwrap() | ||
| .resolve(fault_item.name.as_str()) | ||
| .await; |
There was a problem hiding this comment.
Handle the possibility of resolver being None to prevent potential panics
You're using self.resolver.as_ref().unwrap() without checking if resolver is Some. If resolver is None, this will cause a panic at runtime.
Consider handling the None case gracefully:
let broker_addr = match self.resolver.as_ref() {
Some(resolver) => resolver.resolve(fault_item.name.as_str()).await,
None => {
remove_set.insert(name.clone());
continue;
}
};This ensures that your code doesn't panic and handles the scenario where resolver is not set.
| fn start_detector(this: ArcRefCellWrapper<Self>) { | ||
| tokio::spawn(async move { | ||
| loop { | ||
| tokio::time::sleep(tokio::time::Duration::from_secs(3)).await; | ||
| if !this | ||
| .start_detector_enable | ||
| .load(std::sync::atomic::Ordering::Relaxed) | ||
| { | ||
| continue; | ||
| } | ||
|
|
||
| this.detect_by_one_round().await; | ||
| } | ||
| }); |
There was a problem hiding this comment.
Implement a shutdown mechanism for the detector task to prevent resource leaks
In the start_detector method, you're spawning a Tokio task that runs indefinitely. Without a way to stop this task, it could continue running even when the instance is no longer needed, leading to resource leaks.
Consider adding a shutdown signal using a tokio::sync::Notify or a cancellation token to allow the task to exit gracefully when it's appropriate.
Codecov ReportAttention: Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## main #1068 +/- ##
==========================================
- Coverage 19.86% 19.80% -0.06%
==========================================
Files 426 426
Lines 35561 35659 +98
==========================================
+ Hits 7063 7064 +1
- Misses 28498 28595 +97 ☔ View full report in Codecov by Sentry. |
Which Issue(s) This PR Fixes(Closes)
Fixes #1067
Brief Description
How Did You Test This Change?
Summary by CodeRabbit
New Features
ResolverLocaltrait for non-blocking resolution of broker addresses.Bug Fixes
topic_route_data_changedto ensure accurate topic route data changes.Documentation
Refactor
DefaultMQProducerusing a builder pattern.