[ISSUE #1411]🔥BrokerRequestProcessor add QueryAssignmentProcessor logic handle🚨#1412
[ISSUE #1411]🔥BrokerRequestProcessor add QueryAssignmentProcessor logic handle🚨#1412rocketmq-rust-bot merged 1 commit intomainfrom
Conversation
WalkthroughThe changes in this pull request primarily focus on modifications to various files within the Changes
Assessment against linked issues
Possibly related PRs
Suggested labels
Suggested reviewers
Thank you for using CodeRabbit. We offer it for free to the OSS community and would appreciate your support in helping us grow. If you find it useful, would you consider giving us a shout-out on your favorite social media? 🪧 TipsChatThere are 3 ways to chat with CodeRabbit:
Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments. CodeRabbit Commands (Invoked using PR comments)
Other keywords and placeholders
Documentation and Community
|
|
🔊@mxsm 🚀Thanks for your contribution 🎉. CodeRabbit(AI) will review your code first 🔥 |
Codecov ReportAttention: Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## main #1412 +/- ##
==========================================
+ Coverage 21.11% 21.25% +0.13%
==========================================
Files 443 443
Lines 57146 57212 +66
==========================================
+ Hits 12069 12161 +92
+ Misses 45077 45051 -26 ☔ View full report in Codecov by Sentry. |
There was a problem hiding this comment.
Actionable comments posted: 2
🧹 Outside diff range and nitpick comments (10)
rocketmq-broker/src/processor.rs (1)
163-168: Consider adding documentation for the new request handling logic.While the implementation is correct, it would be beneficial to add documentation explaining:
- The purpose and expected behavior of
QueryConsumeQueueandSetMessageRequestModerequest handling- The relationship between these request codes and the assignment processor
- Any specific conditions or requirements for these operations
Would you like me to help draft the documentation comments?
rocketmq-broker/src/transaction/queue/default_transactional_message_check_listener.rs (4)
Line range hint
89-124: Consider making max check times configurableThe implementation handles messages that have been checked too many times, but the threshold for "too many times" isn't explicitly defined or configurable.
Consider adding a configuration parameter for the maximum number of checks:
pub struct DefaultTransactionalMessageCheckListener<MS> { inner: TransactionalMessageCheckListenerInner, topic_config_manager: TopicConfigManager, message_store: ArcMut<MS>, + max_check_times: u32, }
Line range hint
156-157: Replace unwrap with proper error handlingThe
unwrap_or_default()on queue_id parsing could panic if the string is malformed.Consider using proper error handling:
- msg_ext.set_queue_id(queue_id.as_str().parse::<i32>().unwrap_or_default()); + msg_ext.set_queue_id(queue_id.as_str().parse::<i32>().unwrap_or_else(|e| { + warn!("Failed to parse queue_id '{}': {}", queue_id, e); + 0 + }));
Line range hint
170-175: Enhance error handling for null channel caseThe current implementation only logs a warning when the channel is null, which might lead to silent failures.
Consider propagating the error:
} else { - warn!( - "Check transaction failed, channel is null. groupId={:?}", - group_id - ); + return Err(crate::Error::Custom(format!( + "Check transaction failed: no available channel for group {:?}", + group_id + ))); }
Line range hint
246-247: Resolve TODO comment for queue_id randomizationThe commented out code for queue_id randomization should be addressed.
Would you like me to help implement a proper queue_id selection strategy that:
- Uses a deterministic approach for testing
- Implements proper randomization for production code
- Adds test cases for queue_id selection?
rocketmq-broker/src/client/consumer_group_info.rs (2)
Line range hint
388-402: Enhance test coverage for subscription updatesThe current test only verifies the basic happy path. Consider adding test cases for:
- Subscription version handling (when sub_version is higher/lower)
- Topic removal scenarios
- Multiple topic subscriptions
- Verification of the actual subscription data after update
Here's a suggested enhancement:
#[test] fn consumer_group_info_update_subscription() { let group_name = "test_group".to_string(); let consume_type = ConsumeType::ConsumePassively; let message_model = MessageModel::Clustering; let consume_from_where = ConsumeFromWhere::ConsumeFromLastOffset; let consumer_group_info = ConsumerGroupInfo::new( group_name.clone(), consume_type, message_model, consume_from_where, ); let mut sub_list = HashSet::new(); let subscription_data = SubscriptionData { topic: "topic".into(), sub_string: "sub_string".into(), + sub_version: 1, ..Default::default() }; sub_list.insert(subscription_data.clone()); // Test initial subscription assert!(consumer_group_info.update_subscription(&sub_list)); + + // Verify subscription was stored correctly + let stored_sub = consumer_group_info.find_subscription_data(&"topic".into()).unwrap(); + assert_eq!(stored_sub.topic, subscription_data.topic); + + // Test update with higher version + let mut updated_sub = subscription_data.clone(); + updated_sub.sub_version = 2; + let mut new_sub_list = HashSet::new(); + new_sub_list.insert(updated_sub); + assert!(consumer_group_info.update_subscription(&new_sub_list)); + + // Test update with lower version (should not update) + let mut old_sub = subscription_data; + old_sub.sub_version = 0; + let mut old_sub_list = HashSet::new(); + old_sub_list.insert(old_sub); + assert!(!consumer_group_info.update_subscription(&old_sub_list)); + + // Test topic removal + assert!(consumer_group_info.update_subscription(&HashSet::new())); + assert!(consumer_group_info.get_subscribe_topics().is_empty()); }
Based on the previous search results, I can generate the final response as we have enough context. We found that:
- The
update_channelmethod is a critical part of the consumer group functionality, used in bothconsumer_group_info.rsandconsumer_manager.rs- The only test for this functionality is currently commented out
- There are no other active tests covering this functionality
Re-enable the commented out test case for update_channel
The test case for
update_channelis currently the only test coverage for this critical functionality that manages consumer channel updates. The test appears to be properly structured, testing the basic functionality with valid inputs. Since this method is used in the consumer manager and affects the broker's client handling capabilities, having it untested poses a reliability risk.
- Re-enable the commented test in
rocketmq-broker/src/client/consumer_group_info.rsby removing the comment markers/*and*/🔗 Analysis chain
Line range hint
347-387: Re-enable the commented out test case for update_channelThe test case for
update_channelis currently commented out, leaving this critical functionality untested. This method handles important client connection updates and should be properly tested.Let's verify if this test case is the only one commented out:
Let me verify if there are any other test cases for the
update_channelfunctionality that might be active, and also check the implementation to understand the importance of this test:
Let me check the implementation of
update_channelto understand its functionality and verify if it's being tested elsewhere:
Let me try one more search with a different pattern to find the implementation and any related tests:
🏁 Scripts executed
The following scripts were executed for the analysis:
Script:
#!/bin/bash # Search for commented out test cases in the codebase rg -U "(/\*|\s*//\s*)\s*#\[test\]" --type rustLength of output: 354
Script:
#!/bin/bash # Search for update_channel tests and implementation rg "update_channel" --type rust -A 5 -B 5 # Also search for the specific test pattern ast-grep --pattern 'fn update_channel($$$) { $$$ }'Length of output: 3569
Script:
#!/bin/bash # Get the implementation of update_channel ast-grep --pattern 'pub fn update_channel($$$) { $$$ }' # Check if there are any other tests for this functionality rg "test.*update.*channel" --type rustLength of output: 98
Script:
#!/bin/bash # Search for the update_channel implementation with more context rg -U "pub fn update_channel.*?\{.*?\}" --multiline --type rust -A 20 # Also check for any integration tests that might be testing this functionality fd -e rs -t f "test" --exec rg -l "update_channel"Length of output: 120
rocketmq-broker/src/processor/query_assignment_processor.rs (3)
166-166: UseHashSet::new()instead ofHashSet::with_capacity(0)Using
HashSet::new()is more idiomatic and clearer when creating an emptyHashSetwithout pre-allocated capacity.Apply this diff to improve code clarity:
- HashSet::with_capacity(0) + HashSet::new()
Line range hint
297-302: Verify the condition forpop_share_queue_numThe condition
pop_share_queue_num <= 0 || pop_share_queue_num >= cid_all.len() as i32 - 1may lead to unexpected behavior whenpop_share_queue_numequalscid_all.len() as i32 - 1. Please verify if the comparison should be>= cid_all.len() as i32to prevent potential off-by-one errors.
403-475: Refactor tests to reduce code duplicationThe test cases contain repeated setup code for common variables like
consumer_group,current_cid,mq_all, andcid_all. Consider creating helper functions or utilizing test fixtures to minimize duplication and improve maintainability.
📜 Review details
Configuration used: .coderabbit.yaml
Review profile: CHILL
📒 Files selected for processing (9)
rocketmq-broker/src/client/client_channel_info.rs(0 hunks)rocketmq-broker/src/client/consumer_group_info.rs(1 hunks)rocketmq-broker/src/client/rebalance/rebalance_lock_manager.rs(0 hunks)rocketmq-broker/src/load_balance/message_request_mode_manager.rs(1 hunks)rocketmq-broker/src/processor.rs(1 hunks)rocketmq-broker/src/processor/end_transaction_processor.rs(1 hunks)rocketmq-broker/src/processor/query_assignment_processor.rs(10 hunks)rocketmq-broker/src/transaction/queue/default_transactional_message_check_listener.rs(1 hunks)rocketmq-broker/src/util/hook_utils.rs(0 hunks)
💤 Files with no reviewable changes (3)
- rocketmq-broker/src/client/client_channel_info.rs
- rocketmq-broker/src/client/rebalance/rebalance_lock_manager.rs
- rocketmq-broker/src/util/hook_utils.rs
✅ Files skipped from review due to trivial changes (1)
- rocketmq-broker/src/load_balance/message_request_mode_manager.rs
🔇 Additional comments (6)
rocketmq-broker/src/transaction/queue/default_transactional_message_check_listener.rs (2)
Line range hint 285-289: LGTM: Test case improvements
The test case modifications look good. The test properly verifies empty body handling.
Line range hint 1-304: Verify alignment with QueryAssignmentProcessor objectives
While the code changes look solid, we should verify how this transactional message handling relates to the PR's main objective of adding QueryAssignmentProcessor logic.
rocketmq-broker/src/processor/query_assignment_processor.rs (4)
132-133: Ensure retry topics are processed in Pull mode
The code correctly sets the message request mode to Pull for retry topics starting with RETRY_GROUP_TOPIC_PREFIX. This ensures that retry topics are handled appropriately.
Line range hint 185-199: Properly handle broadcasting consumers in load balancing
The implementation correctly handles broadcasting consumers by retrieving the topic subscription information and returning it. It also logs a warning if no assignment is found.
Line range hint 216-224: Avoid unnecessary warnings for retry topics
When mq_set is None, the code logs a warning unless the topic is a retry topic. This prevents unnecessary warnings for retry topics that may not have subscriptions.
Line range hint 315-319: Ensure correct index calculation in loop
In the loop within allocate_for_pop, the index is incremented and wrapped using modulo. Verify that this logic correctly cycles through cid_all without causing out-of-bounds errors or skipping consumers.
Run the following script to verify the index computations:
✅ Verification successful
Let me gather more context about the index initialization and the loop structure.
Index calculation in loop is safe and correct
The index calculation is properly implemented with no risk of out-of-bounds errors or skipped consumers because:
- The index is initialized using
position()to find the current consumer's position incid_all - The loop runs
pop_share_queue_num - 1times, incrementing and wrapping the index using modulo - The modulo operation
index %= cid_all.len()ensures the index always stays within bounds - The array access
cid_all[index]is safe as index is always <cid_all.len() - The loop only executes when
cid_all.len() <= mq_all.len()andpop_share_queue_num < cid_all.len() - 1
🏁 Scripts executed
The following scripts were executed for the analysis:
Script:
#!/bin/bash
# Description: Check for potential out-of-bounds access or logic errors in index calculation.
# Navigate to the method definition
rg -A 20 'fn allocate_for_pop' rocketmq-broker/src/processor/query_assignment_processor.rs
# Search for index usage and ensure proper bounds
rg -A 5 'index\s*[\+\-]=\s*\d+' rocketmq-broker/src/processor/query_assignment_processor.rs
# Ensure that index is always within 0 and cid_all.len() - 1
Length of output: 1283
Script:
#!/bin/bash
# Get more context around the index usage to see initialization and loop structure
rg -B 10 -A 10 'index\s*\+=\s*1' rocketmq-broker/src/processor/query_assignment_processor.rs
# Also check if there are any other similar index calculations in the file
ast-grep --pattern 'let mut index = $_'
Length of output: 2237
| RequestCode::QueryConsumeQueue | RequestCode::SetMessageRequestMode => { | ||
| self.query_assignment_processor | ||
| .process_request(channel, ctx, request_code, request) | ||
| .await | ||
| } |
There was a problem hiding this comment.
💡 Codebase verification
Missing implementation for QueryConsumeQueue request code
The QueryAssignmentProcessor's match statement doesn't handle the QueryConsumeQueue request code, despite it being routed to this processor in the BrokerRequestProcessor. This mismatch could lead to the request being silently dropped via the _ => None catch-all case.
rocketmq-broker/src/processor/query_assignment_processor.rs: Add a match arm to handleRequestCode::QueryConsumeQueuerocketmq-broker/src/processor.rs: Review whetherQueryConsumeQueueshould actually be routed toquery_assignment_processor
🔗 Analysis chain
LGTM! The implementation follows the established pattern.
The new match arm correctly delegates the QueryConsumeQueue and SetMessageRequestMode request codes to the query_assignment_processor, maintaining consistency with other request handlers.
Let's verify the request code handling implementation:
Let me gather more information about the QueryAssignmentProcessor implementation, specifically focusing on how it handles the QueryConsumeQueue request code.
🏁 Scripts executed
The following scripts were executed for the analysis:
Script:
#!/bin/bash
# Description: Verify the QueryAssignmentProcessor implementation and its usage
# Check if QueryAssignmentProcessor properly handles these request codes
ast-grep --pattern 'impl $_ for QueryAssignmentProcessor {
$$$
process_request($$$) {
$$$
match request_code {
$$$
}
$$$
}
$$$
}'
# Verify these request codes are properly defined
rg -A 1 'QueryConsumeQueue|SetMessageRequestMode'
Length of output: 7562
Script:
#!/bin/bash
# Check QueryAssignmentProcessor's implementation for QueryConsumeQueue handling
rg -A 10 'match request_code \{' rocketmq-broker/src/processor/query_assignment_processor.rs
Length of output: 450
| let msg_ext = MessageExt::default(); | ||
| //msg_ext.set_body(None); | ||
| let msg_inner = end_message_transaction(&msg_ext); | ||
| assert!(!msg_inner.get_body().is_some_and(|b| b.is_empty())); |
There was a problem hiding this comment.
The test end_message_transaction_with_empty_body no longer tests an empty message body
By commenting out msg_ext.set_body(None);, the test end_message_transaction_with_empty_body no longer simulates a message with an empty body. This means the test might not fulfill its intended purpose of ensuring that end_message_transaction handles messages with empty bodies correctly.
To fix this issue, uncomment the line to set the message body to None:
fn end_message_transaction_with_empty_body() {
let msg_ext = MessageExt::default();
- //msg_ext.set_body(None);
+ msg_ext.set_body(None);
let msg_inner = end_message_transaction(&msg_ext);
assert!(!msg_inner.get_body().is_some_and(|b| b.is_empty()));
}📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| let msg_ext = MessageExt::default(); | |
| //msg_ext.set_body(None); | |
| let msg_inner = end_message_transaction(&msg_ext); | |
| assert!(!msg_inner.get_body().is_some_and(|b| b.is_empty())); | |
| let msg_ext = MessageExt::default(); | |
| msg_ext.set_body(None); | |
| let msg_inner = end_message_transaction(&msg_ext); | |
| assert!(!msg_inner.get_body().is_some_and(|b| b.is_empty())); |
Which Issue(s) This PR Fixes(Closes)
Fixes #1411
Brief Description
How Did You Test This Change?
Summary by CodeRabbit
New Features
Bug Fixes
Documentation
Refactor