[ISSUE #702]🚀Support pull message result handle-2🚀#706
Conversation
|
🚀Thanks for your contribution🎉. CodeRabbit(AI) will review your code first🔥 |
|
Warning Rate limit exceeded@mxsm has exceeded the limit for the number of commits or files that can be reviewed per hour. Please wait 52 minutes and 35 seconds before requesting another review. How to resolve this issue?After the wait time has elapsed, a review can be triggered using the We recommend that you space out your commits to avoid hitting the rate limit. How do rate limits work?CodeRabbit enforces hourly rate limits for each developer per organization. Our paid plans have higher rate limits than the trial, open-source and free plans. In all cases, we re-allow further reviews after a brief timeout. Please see our FAQ for further information. WalkthroughThe changes introduce functionalities to support enhanced message pulling and offset management in RocketMQ's broker processor. This includes adding fields and methods to various structs for managing broadcast offsets and consumer offsets, enabling methods for incrementing statistics, and modifying method signatures and logic for message encoding and response handling. Changes
Sequence Diagram(s)sequenceDiagram
participant Client
participant BrokerRuntime
participant BroadcastOffsetManager
participant ConsumerManager
client ->> BrokerRuntime: Pull Message Request
BrokerRuntime ->> BroadcastOffsetManager: Update Offset
BrokerRuntime ->> ConsumerManager: Commit Offset
BrokerRuntime ->> Client: Return Message
Assessment against linked issues
Poem
Thank you for using CodeRabbit. We offer it for free to the OSS community and would appreciate your support in helping us grow. If you find it useful, would you consider giving us a shout-out on your favorite social media? TipsChatThere are 3 ways to chat with CodeRabbit:
Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments. CodeRabbit Commands (invoked as PR comments)
Additionally, you can add CodeRabbit Configration File (
|
|
🚀Thanks for your contribution🎉. CodeRabbit(AI) will review your code first🔥 |
There was a problem hiding this comment.
Actionable comments posted: 4
Review details
Configuration used: .coderabbit.yaml
Review profile: CHILL
Files selected for processing (10)
- rocketmq-broker/src/broker_runtime.rs (4 hunks)
- rocketmq-broker/src/offset/manager/broadcast_offset_manager.rs (1 hunks)
- rocketmq-broker/src/offset/manager/consumer_offset_manager.rs (1 hunks)
- rocketmq-broker/src/processor/default_pull_message_result_handler.rs (5 hunks)
- rocketmq-broker/src/processor/pull_message_processor.rs (3 hunks)
- rocketmq-common/src/common/broker/broker_config.rs (2 hunks)
- rocketmq-remoting/src/protocol/request_source.rs (1 hunks)
- rocketmq-remoting/src/rpc/rpc_client_utils.rs (3 hunks)
- rocketmq-store/src/base/get_message_result.rs (1 hunks)
- rocketmq-store/src/stats/broker_stats_manager.rs (1 hunks)
Additional comments not posted (10)
rocketmq-remoting/src/protocol/request_source.rs (1)
54-54: Good enhancement toRequestSourceusability.Making the
from_valuefunction public enhances the usability of theRequestSourceenum by allowing external modules to directly utilize this conversion functionality.rocketmq-broker/src/offset/manager/consumer_offset_manager.rs (1)
183-198: Proper implementation ofcommit_pull_offset.The new method
commit_pull_offsetcorrectly updates the pull offset table, aligning with the PR's objectives to enhance message handling.rocketmq-store/src/base/get_message_result.rs (1)
159-161: Useful addition toGetMessageResult.The method
message_mapped_listprovides a clean, read-only access to the internal message mappings, which enhances the usability of theGetMessageResultclass.rocketmq-common/src/common/broker/broker_config.rs (1)
157-157: Addition oftransfer_msg_by_heapfield inBrokerConfigThe new boolean field
transfer_msg_by_heaphas been added to theBrokerConfigstruct and is set totrueby default in thedefaultimplementation. Ensure that this default value aligns with the intended use cases and performance expectations, especially considering that it affects how messages are transferred (potentially impacting memory usage and performance).Also applies to: 223-223
rocketmq-broker/src/processor/default_pull_message_result_handler.rs (1)
56-59: Proper Initialization of New Fields inDefaultPullMessageResultHandlerThe new fields
consumer_offset_manager,consumer_manager,broadcast_offset_manager, andbroker_stats_managerare properly initialized in the constructor ofDefaultPullMessageResultHandler. This ensures that the handler has all necessary components to manage message pulling effectively.Also applies to: 76-79
rocketmq-broker/src/processor/pull_message_processor.rs (1)
798-810: Addition ofis_broadcastUtility FunctionThe
is_broadcastfunction has been added to determine if a message should be treated as a broadcast based on the consumer group info. This utility function enhances the modularity and readability of the code. Make sure it is used consistently wherever applicable.rocketmq-broker/src/broker_runtime.rs (4)
128-128: Review cloning ofbroadcast_offset_manager.Adding
broadcast_offset_managerto theCloneimplementation ofBrokerRuntimeis essential to ensure that each clone of the runtime has its own reference to the shared manager. This is correctly implemented usingArc::clone().
201-201: Initialization ofbroadcast_offset_managerinBrokerRuntimeconstructor.The initialization of
broadcast_offset_managerusingArc::new(Default::default())in the constructor is appropriate. This ensures that the manager is ready for use immediately afterBrokerRuntimeinstantiation.
367-370: Correct integration ofbroadcast_offset_managerinDefaultPullMessageResultHandler.The integration of
broadcast_offset_managerinDefaultPullMessageResultHandlerwithin theinit_processormethod ensures that it is available for message processing. This aligns with the PR's objective to enhance message handling capabilities.
97-97: Ensure proper handling ofbroadcast_offset_managerinitialization.The addition of
broadcast_offset_managertoBrokerRuntimeis a significant change. It is crucial to ensure that this manager is properly initialized to avoid runtime errors or uninitialized access.Verification successful
Initialization of
broadcast_offset_manageris properly handled.The
broadcast_offset_manageris initialized usingArc::new(Default::default()), ensuring it is properly set up in theBrokerRuntimestruct.
rocketmq-broker/src/broker_runtime.rs:
broadcast_offset_manager: Arc::new(Default::default())Scripts executed
The following scripts were executed for the analysis:
Script:
#!/bin/bash # Description: Search for the initialization of `broadcast_offset_manager` in the constructor to ensure it is properly handled. # Test: Search for the initialization of `broadcast_offset_manager`. Expect: Proper initialization in the constructor. rg --type rust $'broadcast_offset_manager: Arc::new(Default::default())' rocketmq-broker/src/broker_runtime.rsLength of output: 111
Script:
#!/bin/bash # Description: Search for the initialization of `broadcast_offset_manager` in the BrokerRuntime struct to ensure it is properly handled. # Test: Search for the initialization of `broadcast_offset_manager` in the BrokerRuntime struct. rg --type rust 'broadcast_offset_manager' rocketmq-broker/src/broker_runtime.rs # Test: Search for the constructor of the BrokerRuntime struct to verify initialization. rg --type rust 'fn new' rocketmq-broker/src/broker_runtime.rsLength of output: 499
| pub fn update_offset( | ||
| &self, | ||
| topic: &str, | ||
| group: &str, | ||
| queue_id: i32, | ||
| offset: i64, | ||
| client_id: &str, | ||
| from_proxy: bool, | ||
| ) { | ||
| unimplemented!() | ||
| } |
There was a problem hiding this comment.
Implementation needed for update_offset method.
The method update_offset is currently marked as unimplemented!(). This suggests that the functionality is not yet complete.
Do you have a timeline for implementing this? It might be beneficial to add a TODO comment or create a tracking issue to ensure this does not get overlooked.
| self.update_broadcast_pulled_offset( | ||
| request_header.topic.as_str(), | ||
| request_header.consumer_group.as_str(), | ||
| request_header.queue_id.unwrap(), | ||
| &request_header, | ||
| &channel, | ||
| Some(&response), | ||
| get_message_result.next_begin_offset(), | ||
| ); |
There was a problem hiding this comment.
Complex Logic in update_broadcast_pulled_offset and try_commit_offset Methods
The methods update_broadcast_pulled_offset and try_commit_offset contain complex logic for managing offsets which might be hard to maintain. Consider simplifying or refactoring these methods to improve readability and maintainability.
- Existing complex methods
+ Proposed simplified versions (details depend on further context)Also applies to: 143-147
| pub fn inc_topic_put_nums(&self, topic: &str, num: i32, times: i32) {} | ||
|
|
||
| pub fn inc_topic_put_size(&self, topic: &str, size: i32) {} | ||
|
|
||
| pub fn inc_group_get_nums(&self, group: &str, topic: &str, inc_value: i32) {} | ||
| pub fn inc_group_get_size(&self, group: &str, topic: &str, inc_value: i32) {} | ||
|
|
||
| pub fn inc_group_ck_nums(&self, group: &str, topic: &str, inc_value: i32) {} | ||
|
|
||
| pub fn inc_group_ack_nums(&self, group: &str, topic: &str, inc_value: i32) {} | ||
| pub fn inc_broker_get_nums(&self, group: &str, inc_value: i32) {} |
There was a problem hiding this comment.
Tip
Codebase Verification
Ensure Usage of New Statistics Increment Methods
The methods inc_topic_put_nums, inc_topic_put_size, inc_group_ck_nums, and inc_group_ack_nums are defined but not used anywhere in the codebase. Ensure these methods are invoked where necessary to maintain accurate statistics.
- File:
rocketmq-store/src/stats/broker_stats_manager.rs- Methods:
inc_topic_put_numsinc_topic_put_sizeinc_group_ck_numsinc_group_ack_nums
- Methods:
Analysis chain
Implementation of Statistics Increment Methods
New methods for incrementing various statistics (inc_topic_put_nums, inc_topic_put_size, inc_group_get_nums, inc_group_get_size, inc_group_ck_nums, inc_group_ack_nums, inc_broker_get_nums) have been added. Ensure that these methods are called appropriately within the system to reflect accurate statistics.
Scripts executed
The following scripts were executed for the analysis:
Script:
#!/bin/bash
# Description: Verify if the new statistics methods are used appropriately throughout the codebase.
# Test: Search for the usage of the new statistics methods. Expect: Usages in relevant parts of the codebase.
rg --type rust $'inc_'
Length of output: 2995
Script:
#!/bin/bash
# Description: Verify if the remaining new statistics methods are used appropriately throughout the codebase.
# Test: Search for the usage of the specific new statistics methods that were not found in the initial search.
rg --type rust -e 'inc_topic_put_nums' -e 'inc_topic_put_size' -e 'inc_group_ck_nums' -e 'inc_group_ack_nums'
Length of output: 601
| pub fn rewrite_response_for_static_topic( | ||
| request_header: &PullMessageRequestHeader, | ||
| response_header: &mut PullMessageResponseHeader, | ||
| mapping_context: &mut TopicQueueMappingContext, | ||
| code: ResponseCode, | ||
| ) -> Option<RemotingCommand> { | ||
| mapping_context.mapping_detail.as_ref()?; | ||
| let mapping_detail = mapping_context.mapping_detail.as_ref().unwrap(); | ||
| let leader_item = mapping_context.leader_item.as_ref().unwrap(); | ||
| let current_item = mapping_context.current_item.as_ref().unwrap(); | ||
| let mapping_items = &mut mapping_context.mapping_item_list; | ||
| let earlist_item = | ||
| TopicQueueMappingUtils::find_logic_queue_mapping_item(mapping_items, 0, true).unwrap(); | ||
|
|
||
| assert!(current_item.logic_offset >= 0); | ||
|
|
||
| let request_offset = request_header.queue_offset; | ||
| let mut next_begin_offset = response_header.next_begin_offset; | ||
| let mut min_offset = response_header.min_offset; | ||
| let mut max_offset = response_header.max_offset; | ||
| let mut response_code = code; | ||
|
|
||
| if code != ResponseCode::Success { | ||
| let mut is_revised = false; | ||
| if leader_item.gen == current_item.gen { | ||
| if request_offset > max_offset.unwrap() { | ||
| if code == ResponseCode::PullOffsetMoved { | ||
| response_code = ResponseCode::PullOffsetMoved; | ||
| next_begin_offset = max_offset; | ||
| } else { | ||
| response_code = code; | ||
| } | ||
| } else if request_offset < min_offset.unwrap() { | ||
| next_begin_offset = min_offset; | ||
| response_code = ResponseCode::PullRetryImmediately; | ||
| } else { | ||
| response_code = code; | ||
| } | ||
| } | ||
|
|
||
| if earlist_item.gen == current_item.gen { | ||
| if request_offset < min_offset.unwrap() { | ||
| /*if code == ResponseCode::PullOffsetMoved { | ||
| response_code = ResponseCode::PullOffsetMoved; | ||
| next_begin_offset = min_offset; | ||
| } else { | ||
| response_code = ResponseCode::PullOffsetMoved; | ||
| next_begin_offset = min_offset; | ||
| }*/ | ||
| if earlist_item.gen == current_item.gen { | ||
| if request_offset < min_offset.unwrap() { | ||
| /*if code == ResponseCode::PullOffsetMoved { | ||
| response_code = ResponseCode::PullOffsetMoved; | ||
| next_begin_offset = min_offset; | ||
| } else if request_offset >= max_offset.unwrap() { | ||
| if let Some(next_item) = | ||
| TopicQueueMappingUtils::find_next(mapping_items, Some(current_item), true) | ||
| { | ||
| is_revised = true; | ||
| next_begin_offset = Some(next_item.start_offset); | ||
| min_offset = Some(next_item.start_offset); | ||
| max_offset = min_offset; | ||
| response_code = ResponseCode::PullRetryImmediately; | ||
| } else { | ||
| response_code = ResponseCode::PullNotFound; | ||
| } | ||
| } else { | ||
| response_code = code; | ||
| } | ||
| } | ||
|
|
||
| if !is_revised | ||
| && leader_item.gen != current_item.gen | ||
| && earlist_item.gen != current_item.gen | ||
| { | ||
| if request_offset < min_offset? { | ||
| response_code = ResponseCode::PullOffsetMoved; | ||
| next_begin_offset = min_offset; | ||
| }*/ | ||
| response_code = ResponseCode::PullOffsetMoved; | ||
| next_begin_offset = min_offset; | ||
| } else if request_offset >= max_offset.unwrap() { | ||
| if let Some(next_item) = | ||
| TopicQueueMappingUtils::find_next(mapping_items, Some(current_item), true) | ||
| { | ||
| is_revised = true; | ||
| next_begin_offset = Some(next_item.start_offset); | ||
| min_offset = Some(next_item.start_offset); | ||
| max_offset = min_offset; | ||
| response_code = ResponseCode::PullRetryImmediately; | ||
| } else if request_offset >= max_offset? { | ||
| if let Some(next_item) = | ||
| TopicQueueMappingUtils::find_next(mapping_items, Some(current_item), true) | ||
| { | ||
| next_begin_offset = Some(next_item.start_offset); | ||
| min_offset = Some(next_item.start_offset); | ||
| max_offset = min_offset; | ||
| response_code = ResponseCode::PullRetryImmediately; | ||
| } else { | ||
| response_code = ResponseCode::PullNotFound; | ||
| } | ||
| } else { | ||
| response_code = code; | ||
| response_code = ResponseCode::PullNotFound; | ||
| } | ||
| } else { | ||
| response_code = code; | ||
| } | ||
| } | ||
|
|
||
| if current_item.check_if_end_offset_decided() | ||
| && next_begin_offset.unwrap() >= current_item.end_offset | ||
| if !is_revised | ||
| && leader_item.gen != current_item.gen | ||
| && earlist_item.gen != current_item.gen | ||
| { | ||
| next_begin_offset = Some(current_item.end_offset); | ||
| if request_offset < min_offset? { | ||
| next_begin_offset = min_offset; | ||
| response_code = ResponseCode::PullRetryImmediately; | ||
| } else if request_offset >= max_offset? { | ||
| if let Some(next_item) = | ||
| TopicQueueMappingUtils::find_next(mapping_items, Some(current_item), true) | ||
| { | ||
| next_begin_offset = Some(next_item.start_offset); | ||
| min_offset = Some(next_item.start_offset); | ||
| max_offset = min_offset; | ||
| response_code = ResponseCode::PullRetryImmediately; | ||
| } else { | ||
| response_code = ResponseCode::PullNotFound; | ||
| } | ||
| } else { | ||
| response_code = code; | ||
| } | ||
| } | ||
| } | ||
|
|
||
| response_header.next_begin_offset = | ||
| Some(current_item.compute_static_queue_offset_strictly(next_begin_offset.unwrap())); | ||
| response_header.min_offset = Some(current_item.compute_static_queue_offset_strictly( | ||
| if current_item.check_if_end_offset_decided() | ||
| && next_begin_offset.unwrap() >= current_item.end_offset | ||
| { | ||
| next_begin_offset = Some(current_item.end_offset); | ||
| } | ||
|
|
||
| response_header.next_begin_offset = | ||
| Some(current_item.compute_static_queue_offset_strictly(next_begin_offset.unwrap())); | ||
| response_header.min_offset = | ||
| Some(current_item.compute_static_queue_offset_strictly( | ||
| min_offset.unwrap().max(current_item.start_offset), | ||
| )); | ||
| response_header.max_offset = Some( | ||
| current_item | ||
| .compute_static_queue_offset_strictly(max_offset.unwrap()) | ||
| .max(TopicQueueMappingDetail::compute_max_offset_from_mapping( | ||
| mapping_detail, | ||
| mapping_context.global_id, | ||
| )), | ||
| ); | ||
| response_header.offset_delta = Some(current_item.compute_offset_delta()); | ||
|
|
||
| if code != ResponseCode::Success { | ||
| Some( | ||
| RemotingCommand::create_response_command_with_header(response_header.clone()) | ||
| .set_code(response_code), | ||
| ) | ||
| } else { | ||
| None | ||
| } | ||
| response_header.max_offset = Some( | ||
| current_item | ||
| .compute_static_queue_offset_strictly(max_offset.unwrap()) | ||
| .max(TopicQueueMappingDetail::compute_max_offset_from_mapping( | ||
| mapping_detail, | ||
| mapping_context.global_id, | ||
| )), | ||
| ); | ||
| response_header.offset_delta = Some(current_item.compute_offset_delta()); | ||
|
|
||
| if code != ResponseCode::Success { | ||
| Some( | ||
| RemotingCommand::create_response_command_with_header(response_header.clone()) | ||
| .set_code(response_code), | ||
| ) | ||
| } else { | ||
| None |
There was a problem hiding this comment.
Tip
Codebase Verification
Lack of Direct Tests for rewrite_response_for_static_topic Function
The rewrite_response_for_static_topic function is not directly covered by unit tests in the relevant files. This lack of direct testing poses a risk for potential regressions or unintended behavior changes. Please add comprehensive unit tests to ensure the function handles all edge cases and error conditions correctly.
- Files to review:
rocketmq-broker/src/processor/consumer_manage_processor.rsrocketmq-broker/src/processor/default_pull_message_result_handler.rsrocketmq-broker/src/processor/pull_message_processor.rs
Analysis chain
Refactor of rewrite_response_for_static_topic Function
The rewrite_response_for_static_topic function has been significantly refactored to handle various response codes and offsets. Ensure that this refactoring does not introduce any regressions or unintended behavior changes, especially in how it handles edge cases and error conditions.
Scripts executed
The following scripts were executed for the analysis:
Script:
#!/bin/bash
# Description: Verify the correct behavior of the refactored function through unit tests.
# Test: Search for unit tests covering the `rewrite_response_for_static_topic` function. Expect: Comprehensive tests covering all branches.
rg --type rust -A 3 'fn test_rewrite_response_for_static_topic'
Length of output: 65
Script:
#!/bin/bash
# Description: Verify if `rewrite_response_for_static_topic` is referenced in any test files.
# Search for references to `rewrite_response_for_static_topic` in the codebase
rg 'rewrite_response_for_static_topic' --type rust
# Find all test files in the repository
fd --type f --extension rs --exec rg 'rewrite_response_for_static_topic' {}
Length of output: 1437
Script:
#!/bin/bash
# Description: Verify the presence of test functions in files where `rewrite_response_for_static_topic` is referenced.
# List of files where `rewrite_response_for_static_topic` is referenced
files=(
"rocketmq-broker/src/processor/consumer_manage_processor.rs"
"rocketmq-broker/src/processor/default_pull_message_result_handler.rs"
"rocketmq-broker/src/processor/pull_message_processor.rs"
)
# Search for test functions in the listed files
for file in "${files[@]}"; do
echo "Searching for test functions in $file"
rg --type rust 'fn test_' "$file"
done
Length of output: 1138
|
🚀Thanks for your contribution🎉. CodeRabbit(AI) will review your code first🔥 |
Which Issue(s) This PR Fixes(Closes)
Fixes #702
Brief Description
How Did You Test This Change?
Summary by CodeRabbit
New Features
Enhancements
Configuration
transfer_msg_by_heapinBrokerConfig.API Changes
BroadcastOffsetManager,ConsumerOffsetManager,GetMessageResult, andBrokerStatsManager.from_valuefunction inRequestSourcepublic for better accessibility.