[ISSUE #1301]🔥Rocketmq-rust broker supports request code QUERY_ASSIGNMENT(400)🚀#1402
[ISSUE #1301]🔥Rocketmq-rust broker supports request code QUERY_ASSIGNMENT(400)🚀#1402rocketmq-rust-bot merged 1 commit intomainfrom
Conversation
WalkthroughThe pull request introduces several significant changes to the RocketMQ broker's codebase. A new public entity, Changes
Assessment against linked issues
Possibly related issues
Possibly related PRs
Suggested labels
Suggested reviewers
Poem
Thank you for using CodeRabbit. We offer it for free to the OSS community and would appreciate your support in helping us grow. If you find it useful, would you consider giving us a shout-out on your favorite social media? 🪧 TipsChatThere are 3 ways to chat with CodeRabbit:
Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments. CodeRabbit Commands (Invoked using PR comments)
Other keywords and placeholders
Documentation and Community
|
|
🔊@mxsm 🚀Thanks for your contribution 🎉. CodeRabbit(AI) will review your code first 🔥 |
Codecov ReportAttention: Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## main #1402 +/- ##
==========================================
- Coverage 21.20% 21.11% -0.09%
==========================================
Files 443 443
Lines 56917 57146 +229
==========================================
+ Hits 12068 12069 +1
- Misses 44849 45077 +228 ☔ View full report in Codecov by Sentry. |
There was a problem hiding this comment.
Actionable comments posted: 4
🧹 Outside diff range and nitpick comments (5)
rocketmq-broker/src/error.rs (1)
31-32: LGTM! Consider adding documentation for the new error variant.The
IllegalArgumentErrorvariant is well-structured and appropriate for handling validation errors.Consider adding documentation to clarify typical usage scenarios:
+ /// Represents errors caused by invalid arguments provided to broker operations. + /// For example, invalid topic names or configuration parameters. #[error("{0}")] IllegalArgumentError(String),rocketmq-broker/src/processor/query_assignment_processor.rs (4)
322-323: Clarify function call toallocateThe function
allocateis called without qualification, which may cause confusion. Sinceallocateis defined outside of the struct, it would be clearer to qualify the function call or refactor it as an associated method.Consider qualifying the function or refactoring it:
-allocate(consumer_group, current_cid, mq_all, cid_all) +self::allocate(consumer_group, current_cid, mq_all, cid_all)Or move
allocateinside theQueryAssignmentProcessorimpl block.
376-383: Usetracingcrate for consistent loggingIn line 377,
log::info!is used for logging, whereas the rest of the code uses thetracingcrate. For consistency and to leverage structured logging, all logs should usetracing.Update the logging statement:
-log::info!( +tracing::info!( "[BUG] ConsumerGroup: {} The consumerId: {} not in cidAll: {:?}", consumer_group, current_cid, cid_all );
245-252: Provide a default strategy or clear error when strategy is unsupportedWhen an unsupported strategy is requested, the code logs a warning and returns
None. This might leave the client without a clear understanding of the failure.Consider responding with an error message to the client:
-if strategy.is_none() { - warn!( - "QueryLoad: unsupported strategy [{}], {}", - strategy_name, - channel.remote_address() - ); - return None; -} +let strategy = match self.load_strategy.get(strategy_name) { + Some(s) => s, + None => { + warn!( + "QueryLoad: unsupported strategy [{}], {}", + strategy_name, + channel.remote_address() + ); + return Some(RemotingCommand::create_response_command_with_code(ResponseCode::SystemError) + .set_remark(CheetahString::from_static_str("Unsupported load balancing strategy"))); + } +};Alternatively, default to a standard strategy if appropriate.
182-194: Handle empty assignment for broadcasting message modelWhen no assignment is found in broadcasting mode, the code logs a warning but proceeds without an explicit response.
Consider returning a response indicating that no assignments are available:
-if assigned_queue_set.is_none() { +if assigned_queue_set.as_ref().map_or(true, |set| set.is_empty()) { warn!( "QueryLoad: no assignment for group[{}], the topic[{}] does not exist.", consumer_group, topic ); + return Some(RemotingCommand::create_response_command_with_code(ResponseCode::TopicNotExist) + .set_remark(CheetahString::from_static_str("No message queues available for topic"))); }
📜 Review details
Configuration used: .coderabbit.yaml
Review profile: CHILL
📒 Files selected for processing (4)
rocketmq-broker/src/broker_runtime.rs(7 hunks)rocketmq-broker/src/error.rs(1 hunks)rocketmq-broker/src/processor/query_assignment_processor.rs(4 hunks)rocketmq-common/src/common/broker/broker_config.rs(2 hunks)
🔇 Additional comments (12)
rocketmq-broker/src/error.rs (1)
34-35: LGTM! Verify client error handling implementation.
The ClientError variant with automatic conversion from MQClientError is a good addition for proper error propagation.
Let's verify the client error handling implementation:
✅ Verification successful
Based on the search results, I can now generate the final response since I have comprehensive information about the client error handling implementation:
Client error handling is properly implemented and consistently used across the codebase
The error propagation through #[from] for MQClientError is well integrated with the broader error handling strategy:
- The client errors are properly propagated up through the broker's error type via the
Fromtrait MQClientErroris consistently handled across multiple components:- Producer implementations handle client errors with proper retry logic and error conversion
- Consumer implementations properly propagate client errors with contextual information
- Remote broker operations properly handle connection and timeout related client errors
- Error messages are descriptive and include relevant context
🏁 Scripts executed
The following scripts were executed for the analysis:
Script:
#!/bin/bash
# Description: Check for consistent error handling patterns across the codebase
# Look for places where MQClientError is handled
rg -t rust "MQClientError" -A 3
# Check for error conversion patterns
ast-grep --pattern 'match $_ {
Err($_) => $_
}'
Length of output: 49203
rocketmq-common/src/common/broker/broker_config.rs (3)
180-180: LGTM! The new configuration field follows Rust naming conventions.
The addition of server_load_balancer_enable to the BrokerConfig struct is well-structured and follows Rust naming conventions.
266-266: LGTM! Default value maintains backward compatibility.
Setting server_load_balancer_enable to true by default is a good choice as it maintains backward compatibility with existing behavior.
180-180: Verify the usage of server_load_balancer_enable.
Let's verify how this new configuration is being used in the codebase.
✅ Verification successful
The server_load_balancer_enable field is properly used in the codebase
The field is used in rocketmq-broker/src/processor/query_assignment_processor.rs as a configuration flag to control load balancer functionality, and it's initialized with a default value of true in the BrokerConfig struct. The implementation appears to be complete and consistent.
🏁 Scripts executed
The following scripts were executed for the analysis:
Script:
#!/bin/bash
# Description: Find usages of server_load_balancer_enable to ensure it's properly integrated
# Search for direct field access
rg "server_load_balancer_enable" -t rust
# Search for the property name in string form (camelCase as used in properties)
rg "serverLoadBalancerEnable" -t rust
Length of output: 399
rocketmq-broker/src/broker_runtime.rs (6)
84-84: LGTM: Import for TopicRouteInfoManager
The import is correctly placed and necessary for the new functionality.
137-137: LGTM: TopicRouteInfoManager field addition
The field is correctly wrapped in Arc for thread-safe sharing and follows the struct's field organization pattern.
176-176: LGTM: Clone implementation for TopicRouteInfoManager
The Clone implementation correctly handles the Arc wrapper, maintaining consistency with other fields.
265-268: LGTM: TopicRouteInfoManager initialization
The initialization is properly implemented with the required dependencies and follows the established pattern.
749-750: LGTM: TopicRouteInfoManager service start
The manager's start() is correctly called during the broker's basic service initialization.
550-552: Verify QueryAssignmentProcessor parameter changes
The initialization includes the necessary dependencies for QUERY_ASSIGNMENT support. Let's verify the QueryAssignmentProcessor implementation matches these parameters.
✅ Verification successful
QueryAssignmentProcessor parameters match implementation
The QueryAssignmentProcessor implementation in rocketmq-broker/src/processor/query_assignment_processor.rs exactly matches the parameters being passed in the initialization:
message_store_config: Arc<MessageStoreConfig>broker_config: Arc<BrokerConfig>topic_route_info_manager: Arc<TopicRouteInfoManager>consumer_manager: Arc<ConsumerManager>
🏁 Scripts executed
The following scripts were executed for the analysis:
Script:
#!/bin/bash
# Description: Verify QueryAssignmentProcessor constructor matches the parameters
# Check the QueryAssignmentProcessor implementation
ast-grep --pattern 'impl QueryAssignmentProcessor {
$$$
fn new($message_store_config: $_, $broker_config: $_, $topic_route_info_manager: $_, $consumer_manager: $_) {
$$$
}
$$$
}'
Length of output: 192
Script:
#!/bin/bash
# Let's try a broader search to find the QueryAssignmentProcessor implementation
rg -A 10 "struct QueryAssignmentProcessor"
# Also search for new() implementation specifically
rg -A 10 "impl QueryAssignmentProcessor"
Length of output: 3491
rocketmq-broker/src/processor/query_assignment_processor.rs (2)
115-116: Consistent error handling for decoding failures
Repeated use of .expect() for decoding can lead to multiple panic points. Ensure consistent and graceful error handling throughout the method.
227-231: 🛠️ Refactor suggestion
Avoid potential data race by cloning cid_all
When accessing cid_all, it's good practice to clone the data if it's shared across threads to prevent data races.
Clone cid_all to ensure thread safety:
-let mut cid_all = if let Some(consumer_group_info) = consumer_group_info {
+let cid_all = if let Some(consumer_group_info) = consumer_group_info {
consumer_group_info.get_all_client_ids()
} else {
vec![]
};
+let mut cid_all = cid_all.clone();Likely invalid or redundant comment.
| pub default_message_request_mode: MessageRequestMode, | ||
| pub default_pop_share_queue_num: i32, | ||
| pub load_balance_poll_name_server_interval: u64, | ||
| pub server_load_balancer_enable: bool, |
There was a problem hiding this comment.
🛠️ Refactor suggestion
Add the new field to get_properties() method.
The new configuration field should be included in the properties HashMap for consistency with other configuration fields.
Add the following code to the get_properties() method:
properties.insert(
"forwardTimeout".into(),
self.forward_timeout.to_string().into(),
);
+ properties.insert(
+ "serverLoadBalancerEnable".into(),
+ self.server_load_balancer_enable.to_string().into(),
+ );
propertiesAlso applies to: 266-266
| index += index; | ||
| index %= cid_all.len(); |
There was a problem hiding this comment.
Fix incorrect index incrementation in loop
In the allocate_for_pop function, incrementing the index with index += index; doubles the index value each time, which is likely unintended and may cause index out-of-bounds errors.
Modify the index incrementation to correctly iterate over consumer IDs:
-for _i in 1..pop_share_queue_num {
- index += index;
+for _ in 1..pop_share_queue_num {
+ index += 1;
index %= cid_all.len();
let result = strategy
.allocate(consumer_group, &cid_all[index], mq_all, cid_all)
.unwrap();
allocate_result.extend(result);
}Committable suggestion skipped: line range outside the PR's diff.
| if pop_share_queue_num <= 0 || pop_share_queue_num > cid_all.len() as i32 - 1 { | ||
| Ok(mq_all |
There was a problem hiding this comment.
Correct the validation logic for pop_share_queue_num
The condition pop_share_queue_num > cid_all.len() as i32 - 1 may lead to incorrect behavior, especially when cid_all.len() is 1, resulting in a negative number. This can cause unintended assignment of message queues.
Adjust the condition as follows to ensure proper validation:
-if pop_share_queue_num <= 0 || pop_share_queue_num > cid_all.len() as i32 - 1 {
+if pop_share_queue_num <= 0 || pop_share_queue_num >= cid_all.len() as i32 {This change ensures that pop_share_queue_num is less than the number of consumer IDs.
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| if pop_share_queue_num <= 0 || pop_share_queue_num > cid_all.len() as i32 - 1 { | |
| Ok(mq_all | |
| if pop_share_queue_num <= 0 || pop_share_queue_num >= cid_all.len() as i32 { | |
| Ok(mq_all |
| QueryAssignmentRequestBody::decode(request.get_body().expect("empty body")) | ||
| .expect("decode QueryAssignmentRequestBody failed"); |
There was a problem hiding this comment.
Handle decoding errors without panicking
Using .expect() will cause the broker to panic if the request body is empty or if decoding fails. It's better to handle these errors gracefully and return an appropriate error response to the client.
Apply this diff to handle errors properly:
-let request_body =
- QueryAssignmentRequestBody::decode(request.get_body().expect("empty body"))
- .expect("decode QueryAssignmentRequestBody failed");
+let request_body = match request.get_body() {
+ Some(body) => match QueryAssignmentRequestBody::decode(body) {
+ Ok(body) => body,
+ Err(e) => {
+ warn!("Failed to decode QueryAssignmentRequestBody: {}", e);
+ return Some(
+ RemotingCommand::create_response_command_with_code(ResponseCode::SystemError)
+ .set_remark(CheetahString::from_static_str("Failed to decode request body")),
+ );
+ }
+ },
+ None => {
+ warn!("Received request with empty body");
+ return Some(
+ RemotingCommand::create_response_command_with_code(ResponseCode::SystemError)
+ .set_remark(CheetahString::from_static_str("Request body is empty")),
+ );
+ }
+};📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| QueryAssignmentRequestBody::decode(request.get_body().expect("empty body")) | |
| .expect("decode QueryAssignmentRequestBody failed"); | |
| let request_body = match request.get_body() { | |
| Some(body) => match QueryAssignmentRequestBody::decode(body) { | |
| Ok(body) => body, | |
| Err(e) => { | |
| warn!("Failed to decode QueryAssignmentRequestBody: {}", e); | |
| return Some( | |
| RemotingCommand::create_response_command_with_code(ResponseCode::SystemError) | |
| .set_remark(CheetahString::from_static_str("Failed to decode request body")), | |
| ); | |
| } | |
| }, | |
| None => { | |
| warn!("Received request with empty body"); | |
| return Some( | |
| RemotingCommand::create_response_command_with_code(ResponseCode::SystemError) | |
| .set_remark(CheetahString::from_static_str("Request body is empty")), | |
| ); | |
| } | |
| }; |
…MENT(400)🚀
Which Issue(s) This PR Fixes(Closes)
Fixes #1301
Brief Description
How Did You Test This Change?
Summary by CodeRabbit
New Features
TopicRouteInfoManagerto enhance topic routing management in the broker.QueryAssignmentProcessorwith additional configuration management and refined request processing logic.Bug Fixes