Skip to content

[ISSUE #1411]🔥BrokerRequestProcessor add QueryAssignmentProcessor logic handle🚨#1412

Merged
rocketmq-rust-bot merged 1 commit intomainfrom
feature-1411
Nov 29, 2024
Merged

[ISSUE #1411]🔥BrokerRequestProcessor add QueryAssignmentProcessor logic handle🚨#1412
rocketmq-rust-bot merged 1 commit intomainfrom
feature-1411

Conversation

@mxsm
Copy link
Copy Markdown
Owner

@mxsm mxsm commented Nov 28, 2024

Which Issue(s) This PR Fixes(Closes)

Fixes #1411

Brief Description

How Did You Test This Change?

Summary by CodeRabbit

  • New Features

    • Enhanced handling of message requests and transactions, improving the overall functionality of the message processing system.
    • Added new methods for managing message checks and handling discard logic for messages.
  • Bug Fixes

    • Improved error handling and validation in message processing methods to ensure stricter adherence to message properties.
  • Documentation

    • Updated test cases to reflect changes in logic and ensure accuracy in message handling scenarios.
  • Refactor

    • Streamlined logic in various methods for better clarity and functionality, particularly in transaction processing and consumer group management.

@coderabbitai
Copy link
Copy Markdown
Contributor

coderabbitai bot commented Nov 28, 2024

Walkthrough

The changes in this pull request primarily focus on modifications to various files within the rocketmq-broker project. Key updates include enhancements to the ConsumerGroupInfo struct, modifications to the BrokerRequestProcessor to handle new request codes, and improvements in transaction processing logic. Several test cases have been commented out or updated, and unnecessary import statements have been removed. Overall, the changes aim to refine the handling of message processing, error management, and improve the clarity of the codebase.

Changes

File Path Change Summary
rocketmq-broker/src/client/client_channel_info.rs Removed unused imports; commented out test cases for client_channel_info_new and client_channel_info_setters.
rocketmq-broker/src/client/consumer_group_info.rs Updated update_channel and update_subscription methods; modified test cases accordingly.
rocketmq-broker/src/client/rebalance/rebalance_lock_manager.rs Removed unused import; no functional changes.
rocketmq-broker/src/load_balance/message_request_mode_manager.rs Renamed variable in test function; no functional changes.
rocketmq-broker/src/processor.rs Added new match arm in process_request for QueryConsumeQueue and SetMessageRequestMode.
rocketmq-broker/src/processor/end_transaction_processor.rs Streamlined transaction request handling; updated error handling and test cases.
rocketmq-broker/src/processor/query_assignment_processor.rs Enhanced query_assignment and do_load_balance methods; added new test cases.
rocketmq-broker/src/transaction/queue/default_transactional_message_check_listener.rs Implemented resolve_discard_msg method; enhanced error handling and updated test cases.
rocketmq-broker/src/util/hook_utils.rs Improved validation logic in several methods; added additional checks and logging.

Assessment against linked issues

Objective Addressed Explanation
Add QueryAssignmentProcessor logic to BrokerRequestProcessor (#1411)

Possibly related PRs

Suggested labels

feature, enhancement, approved, auto merge, AI review first

Suggested reviewers

  • TeslaRustor
  • SpaceXCN

🐇 In the code where the changes did flow,
A rabbit hops high, with a joyful glow.
With channels refined and tests all aligned,
The message flows smoothly, oh what a find!
Let's celebrate this code, so neat and bright,
Hopping through logic, with pure delight! 🐇


Thank you for using CodeRabbit. We offer it for free to the OSS community and would appreciate your support in helping us grow. If you find it useful, would you consider giving us a shout-out on your favorite social media?

❤️ Share
🪧 Tips

Chat

There are 3 ways to chat with CodeRabbit:

  • Review comments: Directly reply to a review comment made by CodeRabbit. Example:
    • I pushed a fix in commit <commit_id>, please review it.
    • Generate unit testing code for this file.
    • Open a follow-up GitHub issue for this discussion.
  • Files and specific lines of code (under the "Files changed" tab): Tag @coderabbitai in a new review comment at the desired location with your query. Examples:
    • @coderabbitai generate unit testing code for this file.
    • @coderabbitai modularize this function.
  • PR comments: Tag @coderabbitai in a new PR comment to ask questions about the PR branch. For the best results, please provide a very specific query, as very limited context is provided in this mode. Examples:
    • @coderabbitai gather interesting stats about this repository and render them as a table. Additionally, render a pie chart showing the language distribution in the codebase.
    • @coderabbitai read src/utils.ts and generate unit testing code.
    • @coderabbitai read the files in the src/scheduler package and generate a class diagram using mermaid and a README in the markdown format.
    • @coderabbitai help me debug CodeRabbit configuration file.

Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments.

CodeRabbit Commands (Invoked using PR comments)

  • @coderabbitai pause to pause the reviews on a PR.
  • @coderabbitai resume to resume the paused reviews.
  • @coderabbitai review to trigger an incremental review. This is useful when automatic reviews are disabled for the repository.
  • @coderabbitai full review to do a full review from scratch and review all the files again.
  • @coderabbitai summary to regenerate the summary of the PR.
  • @coderabbitai resolve resolve all the CodeRabbit review comments.
  • @coderabbitai configuration to show the current CodeRabbit configuration for the repository.
  • @coderabbitai help to get help.

Other keywords and placeholders

  • Add @coderabbitai ignore anywhere in the PR description to prevent this PR from being reviewed.
  • Add @coderabbitai summary to generate the high-level summary at a specific location in the PR description.
  • Add @coderabbitai anywhere in the PR title to generate the title automatically.

Documentation and Community

  • Visit our Documentation for detailed information on how to use CodeRabbit.
  • Join our Discord Community to get help, request features, and share feedback.
  • Follow us on X/Twitter for updates and announcements.

@rocketmq-rust-bot
Copy link
Copy Markdown
Collaborator

🔊@mxsm 🚀Thanks for your contribution 🎉. CodeRabbit(AI) will review your code first 🔥

@rocketmq-rust-robot rocketmq-rust-robot added the feature🚀 Suggest an idea for this project. label Nov 28, 2024
@codecov
Copy link
Copy Markdown

codecov bot commented Nov 28, 2024

Codecov Report

Attention: Patch coverage is 88.00000% with 9 lines in your changes missing coverage. Please review.

Project coverage is 21.25%. Comparing base (b0886a1) to head (fcdc2db).
Report is 2 commits behind head on main.

Files with missing lines Patch % Lines
...broker/src/processor/query_assignment_processor.rs 91.17% 6 Missing ⚠️
rocketmq-broker/src/processor.rs 0.00% 3 Missing ⚠️
Additional details and impacted files
@@            Coverage Diff             @@
##             main    #1412      +/-   ##
==========================================
+ Coverage   21.11%   21.25%   +0.13%     
==========================================
  Files         443      443              
  Lines       57146    57212      +66     
==========================================
+ Hits        12069    12161      +92     
+ Misses      45077    45051      -26     

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

Copy link
Copy Markdown
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

🧹 Outside diff range and nitpick comments (10)
rocketmq-broker/src/processor.rs (1)

163-168: Consider adding documentation for the new request handling logic.

While the implementation is correct, it would be beneficial to add documentation explaining:

  • The purpose and expected behavior of QueryConsumeQueue and SetMessageRequestMode request handling
  • The relationship between these request codes and the assignment processor
  • Any specific conditions or requirements for these operations

Would you like me to help draft the documentation comments?

rocketmq-broker/src/transaction/queue/default_transactional_message_check_listener.rs (4)

Line range hint 89-124: Consider making max check times configurable

The implementation handles messages that have been checked too many times, but the threshold for "too many times" isn't explicitly defined or configurable.

Consider adding a configuration parameter for the maximum number of checks:

 pub struct DefaultTransactionalMessageCheckListener<MS> {
     inner: TransactionalMessageCheckListenerInner,
     topic_config_manager: TopicConfigManager,
     message_store: ArcMut<MS>,
+    max_check_times: u32,
 }

Line range hint 156-157: Replace unwrap with proper error handling

The unwrap_or_default() on queue_id parsing could panic if the string is malformed.

Consider using proper error handling:

-            msg_ext.set_queue_id(queue_id.as_str().parse::<i32>().unwrap_or_default());
+            msg_ext.set_queue_id(queue_id.as_str().parse::<i32>().unwrap_or_else(|e| {
+                warn!("Failed to parse queue_id '{}': {}", queue_id, e);
+                0
+            }));

Line range hint 170-175: Enhance error handling for null channel case

The current implementation only logs a warning when the channel is null, which might lead to silent failures.

Consider propagating the error:

         } else {
-            warn!(
-                "Check transaction failed, channel is null. groupId={:?}",
-                group_id
-            );
+            return Err(crate::Error::Custom(format!(
+                "Check transaction failed: no available channel for group {:?}",
+                group_id
+            )));
         }

Line range hint 246-247: Resolve TODO comment for queue_id randomization

The commented out code for queue_id randomization should be addressed.

Would you like me to help implement a proper queue_id selection strategy that:

  1. Uses a deterministic approach for testing
  2. Implements proper randomization for production code
  3. Adds test cases for queue_id selection?
rocketmq-broker/src/client/consumer_group_info.rs (2)

Line range hint 388-402: Enhance test coverage for subscription updates

The current test only verifies the basic happy path. Consider adding test cases for:

  1. Subscription version handling (when sub_version is higher/lower)
  2. Topic removal scenarios
  3. Multiple topic subscriptions
  4. Verification of the actual subscription data after update

Here's a suggested enhancement:

 #[test]
 fn consumer_group_info_update_subscription() {
     let group_name = "test_group".to_string();
     let consume_type = ConsumeType::ConsumePassively;
     let message_model = MessageModel::Clustering;
     let consume_from_where = ConsumeFromWhere::ConsumeFromLastOffset;

     let consumer_group_info = ConsumerGroupInfo::new(
         group_name.clone(),
         consume_type,
         message_model,
         consume_from_where,
     );

     let mut sub_list = HashSet::new();
     let subscription_data = SubscriptionData {
         topic: "topic".into(),
         sub_string: "sub_string".into(),
+        sub_version: 1,
         ..Default::default()
     };
     sub_list.insert(subscription_data.clone());

     // Test initial subscription
     assert!(consumer_group_info.update_subscription(&sub_list));
+    
+    // Verify subscription was stored correctly
+    let stored_sub = consumer_group_info.find_subscription_data(&"topic".into()).unwrap();
+    assert_eq!(stored_sub.topic, subscription_data.topic);
+    
+    // Test update with higher version
+    let mut updated_sub = subscription_data.clone();
+    updated_sub.sub_version = 2;
+    let mut new_sub_list = HashSet::new();
+    new_sub_list.insert(updated_sub);
+    assert!(consumer_group_info.update_subscription(&new_sub_list));
+    
+    // Test update with lower version (should not update)
+    let mut old_sub = subscription_data;
+    old_sub.sub_version = 0;
+    let mut old_sub_list = HashSet::new();
+    old_sub_list.insert(old_sub);
+    assert!(!consumer_group_info.update_subscription(&old_sub_list));
+    
+    // Test topic removal
+    assert!(consumer_group_info.update_subscription(&HashSet::new()));
+    assert!(consumer_group_info.get_subscribe_topics().is_empty());
 }

Based on the previous search results, I can generate the final response as we have enough context. We found that:

  1. The update_channel method is a critical part of the consumer group functionality, used in both consumer_group_info.rs and consumer_manager.rs
  2. The only test for this functionality is currently commented out
  3. There are no other active tests covering this functionality

Re-enable the commented out test case for update_channel

The test case for update_channel is currently the only test coverage for this critical functionality that manages consumer channel updates. The test appears to be properly structured, testing the basic functionality with valid inputs. Since this method is used in the consumer manager and affects the broker's client handling capabilities, having it untested poses a reliability risk.

  • Re-enable the commented test in rocketmq-broker/src/client/consumer_group_info.rs by removing the comment markers /* and */
🔗 Analysis chain

Line range hint 347-387: Re-enable the commented out test case for update_channel

The test case for update_channel is currently commented out, leaving this critical functionality untested. This method handles important client connection updates and should be properly tested.

Let's verify if this test case is the only one commented out:


Let me verify if there are any other test cases for the update_channel functionality that might be active, and also check the implementation to understand the importance of this test:


Let me check the implementation of update_channel to understand its functionality and verify if it's being tested elsewhere:


Let me try one more search with a different pattern to find the implementation and any related tests:

🏁 Scripts executed

The following scripts were executed for the analysis:

Script:

#!/bin/bash
# Search for commented out test cases in the codebase
rg -U "(/\*|\s*//\s*)\s*#\[test\]" --type rust

Length of output: 354


Script:

#!/bin/bash
# Search for update_channel tests and implementation
rg "update_channel" --type rust -A 5 -B 5

# Also search for the specific test pattern
ast-grep --pattern 'fn update_channel($$$) { $$$ }'

Length of output: 3569


Script:

#!/bin/bash
# Get the implementation of update_channel
ast-grep --pattern 'pub fn update_channel($$$) { $$$ }'

# Check if there are any other tests for this functionality
rg "test.*update.*channel" --type rust

Length of output: 98


Script:

#!/bin/bash
# Search for the update_channel implementation with more context
rg -U "pub fn update_channel.*?\{.*?\}" --multiline --type rust -A 20

# Also check for any integration tests that might be testing this functionality
fd -e rs -t f "test" --exec rg -l "update_channel"

Length of output: 120

rocketmq-broker/src/processor/query_assignment_processor.rs (3)

166-166: Use HashSet::new() instead of HashSet::with_capacity(0)

Using HashSet::new() is more idiomatic and clearer when creating an empty HashSet without pre-allocated capacity.

Apply this diff to improve code clarity:

-                HashSet::with_capacity(0)
+                HashSet::new()

Line range hint 297-302: Verify the condition for pop_share_queue_num

The condition pop_share_queue_num <= 0 || pop_share_queue_num >= cid_all.len() as i32 - 1 may lead to unexpected behavior when pop_share_queue_num equals cid_all.len() as i32 - 1. Please verify if the comparison should be >= cid_all.len() as i32 to prevent potential off-by-one errors.


403-475: Refactor tests to reduce code duplication

The test cases contain repeated setup code for common variables like consumer_group, current_cid, mq_all, and cid_all. Consider creating helper functions or utilizing test fixtures to minimize duplication and improve maintainability.

📜 Review details

Configuration used: .coderabbit.yaml
Review profile: CHILL

📥 Commits

Reviewing files that changed from the base of the PR and between b0886a1 and fcdc2db.

📒 Files selected for processing (9)
  • rocketmq-broker/src/client/client_channel_info.rs (0 hunks)
  • rocketmq-broker/src/client/consumer_group_info.rs (1 hunks)
  • rocketmq-broker/src/client/rebalance/rebalance_lock_manager.rs (0 hunks)
  • rocketmq-broker/src/load_balance/message_request_mode_manager.rs (1 hunks)
  • rocketmq-broker/src/processor.rs (1 hunks)
  • rocketmq-broker/src/processor/end_transaction_processor.rs (1 hunks)
  • rocketmq-broker/src/processor/query_assignment_processor.rs (10 hunks)
  • rocketmq-broker/src/transaction/queue/default_transactional_message_check_listener.rs (1 hunks)
  • rocketmq-broker/src/util/hook_utils.rs (0 hunks)
💤 Files with no reviewable changes (3)
  • rocketmq-broker/src/client/client_channel_info.rs
  • rocketmq-broker/src/client/rebalance/rebalance_lock_manager.rs
  • rocketmq-broker/src/util/hook_utils.rs
✅ Files skipped from review due to trivial changes (1)
  • rocketmq-broker/src/load_balance/message_request_mode_manager.rs
🔇 Additional comments (6)
rocketmq-broker/src/transaction/queue/default_transactional_message_check_listener.rs (2)

Line range hint 285-289: LGTM: Test case improvements

The test case modifications look good. The test properly verifies empty body handling.


Line range hint 1-304: Verify alignment with QueryAssignmentProcessor objectives

While the code changes look solid, we should verify how this transactional message handling relates to the PR's main objective of adding QueryAssignmentProcessor logic.

rocketmq-broker/src/processor/query_assignment_processor.rs (4)

132-133: Ensure retry topics are processed in Pull mode

The code correctly sets the message request mode to Pull for retry topics starting with RETRY_GROUP_TOPIC_PREFIX. This ensures that retry topics are handled appropriately.


Line range hint 185-199: Properly handle broadcasting consumers in load balancing

The implementation correctly handles broadcasting consumers by retrieving the topic subscription information and returning it. It also logs a warning if no assignment is found.


Line range hint 216-224: Avoid unnecessary warnings for retry topics

When mq_set is None, the code logs a warning unless the topic is a retry topic. This prevents unnecessary warnings for retry topics that may not have subscriptions.


Line range hint 315-319: Ensure correct index calculation in loop

In the loop within allocate_for_pop, the index is incremented and wrapped using modulo. Verify that this logic correctly cycles through cid_all without causing out-of-bounds errors or skipping consumers.

Run the following script to verify the index computations:

✅ Verification successful

Let me gather more context about the index initialization and the loop structure.


Index calculation in loop is safe and correct

The index calculation is properly implemented with no risk of out-of-bounds errors or skipped consumers because:

  1. The index is initialized using position() to find the current consumer's position in cid_all
  2. The loop runs pop_share_queue_num - 1 times, incrementing and wrapping the index using modulo
  3. The modulo operation index %= cid_all.len() ensures the index always stays within bounds
  4. The array access cid_all[index] is safe as index is always < cid_all.len()
  5. The loop only executes when cid_all.len() <= mq_all.len() and pop_share_queue_num < cid_all.len() - 1
🏁 Scripts executed

The following scripts were executed for the analysis:

Script:

#!/bin/bash
# Description: Check for potential out-of-bounds access or logic errors in index calculation.

# Navigate to the method definition
rg -A 20 'fn allocate_for_pop' rocketmq-broker/src/processor/query_assignment_processor.rs

# Search for index usage and ensure proper bounds
rg -A 5 'index\s*[\+\-]=\s*\d+' rocketmq-broker/src/processor/query_assignment_processor.rs

# Ensure that index is always within 0 and cid_all.len() - 1

Length of output: 1283


Script:

#!/bin/bash
# Get more context around the index usage to see initialization and loop structure
rg -B 10 -A 10 'index\s*\+=\s*1' rocketmq-broker/src/processor/query_assignment_processor.rs

# Also check if there are any other similar index calculations in the file
ast-grep --pattern 'let mut index = $_'

Length of output: 2237

Comment on lines +163 to +167
RequestCode::QueryConsumeQueue | RequestCode::SetMessageRequestMode => {
self.query_assignment_processor
.process_request(channel, ctx, request_code, request)
.await
}
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Codebase verification

Missing implementation for QueryConsumeQueue request code

The QueryAssignmentProcessor's match statement doesn't handle the QueryConsumeQueue request code, despite it being routed to this processor in the BrokerRequestProcessor. This mismatch could lead to the request being silently dropped via the _ => None catch-all case.

  • rocketmq-broker/src/processor/query_assignment_processor.rs: Add a match arm to handle RequestCode::QueryConsumeQueue
  • rocketmq-broker/src/processor.rs: Review whether QueryConsumeQueue should actually be routed to query_assignment_processor
🔗 Analysis chain

LGTM! The implementation follows the established pattern.

The new match arm correctly delegates the QueryConsumeQueue and SetMessageRequestMode request codes to the query_assignment_processor, maintaining consistency with other request handlers.

Let's verify the request code handling implementation:


Let me gather more information about the QueryAssignmentProcessor implementation, specifically focusing on how it handles the QueryConsumeQueue request code.

🏁 Scripts executed

The following scripts were executed for the analysis:

Script:

#!/bin/bash
# Description: Verify the QueryAssignmentProcessor implementation and its usage

# Check if QueryAssignmentProcessor properly handles these request codes
ast-grep --pattern 'impl $_ for QueryAssignmentProcessor {
  $$$
  process_request($$$) {
    $$$
    match request_code {
      $$$
    }
    $$$
  }
  $$$
}'

# Verify these request codes are properly defined
rg -A 1 'QueryConsumeQueue|SetMessageRequestMode' 

Length of output: 7562


Script:

#!/bin/bash
# Check QueryAssignmentProcessor's implementation for QueryConsumeQueue handling
rg -A 10 'match request_code \{' rocketmq-broker/src/processor/query_assignment_processor.rs

Length of output: 450

Comment on lines +457 to 460
let msg_ext = MessageExt::default();
//msg_ext.set_body(None);
let msg_inner = end_message_transaction(&msg_ext);
assert!(!msg_inner.get_body().is_some_and(|b| b.is_empty()));
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

The test end_message_transaction_with_empty_body no longer tests an empty message body

By commenting out msg_ext.set_body(None);, the test end_message_transaction_with_empty_body no longer simulates a message with an empty body. This means the test might not fulfill its intended purpose of ensuring that end_message_transaction handles messages with empty bodies correctly.

To fix this issue, uncomment the line to set the message body to None:

 fn end_message_transaction_with_empty_body() {
     let msg_ext = MessageExt::default();
-    //msg_ext.set_body(None);
+    msg_ext.set_body(None);
     let msg_inner = end_message_transaction(&msg_ext);
     assert!(!msg_inner.get_body().is_some_and(|b| b.is_empty()));
 }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
let msg_ext = MessageExt::default();
//msg_ext.set_body(None);
let msg_inner = end_message_transaction(&msg_ext);
assert!(!msg_inner.get_body().is_some_and(|b| b.is_empty()));
let msg_ext = MessageExt::default();
msg_ext.set_body(None);
let msg_inner = end_message_transaction(&msg_ext);
assert!(!msg_inner.get_body().is_some_and(|b| b.is_empty()));

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

AI review first Ai review pr first approved PR has approved auto merge feature🚀 Suggest an idea for this project.

Projects

None yet

Development

Successfully merging this pull request may close these issues.

[Feature🚀] BrokerRequestProcessor add QueryAssignmentProcessor logic handle

4 participants