[ISSUE #790]🚀Support Query message(request code :12,33)🔥#791
Conversation
WalkthroughThe update introduces Changes
Assessment against linked issues
Thank you for using CodeRabbit. We offer it for free to the OSS community and would appreciate your support in helping us grow. If you find it useful, would you consider giving us a shout-out on your favorite social media? TipsChatThere are 3 ways to chat with CodeRabbit:
Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments. CodeRabbit Commands (invoked as PR comments)
Additionally, you can add CodeRabbit Configration File (
|
|
🚀Thanks for your contribution🎉. CodeRabbit(AI) will review your code first🔥 |
There was a problem hiding this comment.
Actionable comments posted: 5
Outside diff range, codebase verification and nitpick comments (7)
rocketmq-remoting/src/protocol/header.rs (2)
33-34: Add documentation for the new modules.Consider adding documentation comments for the new modules
query_message_request_headerandquery_message_response_headerto improve code readability and maintainability.
38-39: Add documentation for the new modules.Consider adding documentation comments for the new modules
view_message_request_headerandview_message_response_headerto improve code readability and maintainability.rocketmq-store/src/base.rs (1)
31-31: Add documentation for the new module.Consider adding a documentation comment for the new module
query_message_resultto improve code readability and maintainability.rocketmq-store/src/base/query_message_result.rs (3)
22-28: Add documentation for the struct and its fields.Consider adding documentation comments for the
QueryMessageResultstruct and its fields to improve code readability and maintainability.
30-43: Add documentation for the method.Consider adding a documentation comment for the
get_message_datamethod to improve code readability and maintainability.
45-48: Add documentation for the method.Consider adding a documentation comment for the
add_messagemethod to improve code readability and maintainability.rocketmq-store/src/index/index_service.rs (1)
Line range hint
134-167:
Consider adding comments for readability.Adding comments to explain the logic within the
query_offsetfunction would improve readability and maintainability.
Review details
Configuration used: .coderabbit.yaml
Review profile: CHILL
Files selected for processing (16)
- rocketmq-broker/src/broker_runtime.rs (4 hunks)
- rocketmq-broker/src/processor.rs (2 hunks)
- rocketmq-broker/src/processor/query_message_processor.rs (1 hunks)
- rocketmq-remoting/src/protocol/header.rs (1 hunks)
- rocketmq-remoting/src/protocol/header/query_message_request_header.rs (1 hunks)
- rocketmq-remoting/src/protocol/header/query_message_response_header.rs (1 hunks)
- rocketmq-remoting/src/protocol/header/view_message_request_header.rs (1 hunks)
- rocketmq-remoting/src/protocol/header/view_message_response_header.rs (1 hunks)
- rocketmq-remoting/src/rpc/topic_request_header.rs (1 hunks)
- rocketmq-store/src/base.rs (1 hunks)
- rocketmq-store/src/base/query_message_result.rs (1 hunks)
- rocketmq-store/src/base/select_result.rs (2 hunks)
- rocketmq-store/src/index/index_service.rs (2 hunks)
- rocketmq-store/src/index/query_offset_result.rs (1 hunks)
- rocketmq-store/src/log_file.rs (2 hunks)
- rocketmq-store/src/message_store/default_message_store.rs (4 hunks)
Additional comments not posted (31)
rocketmq-remoting/src/protocol/header/view_message_response_header.rs (2)
1-16: Licensing Information Approved.The licensing information is standard and complies with ASF requirements.
17-22: Struct Definition Approved.The
ViewMessageResponseHeaderstruct is correctly defined and uses appropriate traits.rocketmq-remoting/src/protocol/header/view_message_request_header.rs (2)
1-16: Licensing Information Approved.The licensing information is standard and complies with ASF requirements.
17-25: Struct Definition Approved.The
ViewMessageRequestHeaderstruct is correctly defined and uses appropriate traits.rocketmq-remoting/src/protocol/header/query_message_response_header.rs (2)
1-16: Licensing Information Approved.The licensing information is standard and complies with ASF requirements.
17-26: Struct Definition Approved.The
QueryMessageResponseHeaderstruct is correctly defined and uses appropriate traits.rocketmq-store/src/index/query_offset_result.rs (2)
Line range hint
1-10: Licensing Information Approved.The licensing information is standard and complies with ASF requirements.
Line range hint
12-43: Struct Definition and Method Approved.The
QueryOffsetResultstruct is correctly defined and the new methodget_phy_offsets_mutis appropriately implemented.rocketmq-store/src/base/select_result.rs (2)
46-50: Approve theget_buffer_slice_mutmethod.The method correctly retrieves a mutable slice of the buffer from the mapped file. Ensure that the mapped file is not null and the buffer access is within bounds.
52-57: Approve theget_bytesmethod.The method correctly converts the buffer to a
Bytesobject if the size is greater than 0 and the mapped file is not null. Ensure that the buffer conversion is handled properly.rocketmq-store/src/log_file.rs (5)
124-131: Approve thequery_messagemethod.The method correctly handles querying messages based on the provided topic, key, max number, and timestamp range. Ensure that the query parameters are validated and the query logic is correct.
133-136: Approve theselect_one_message_by_offsetmethod.The method correctly handles selecting a single message by its offset. Ensure that the offset is validated and the selection logic is correct.
138-142: Approve theselect_one_message_by_offset_with_sizemethod.The method correctly handles selecting a single message by its offset and size. Ensure that the offset and size are validated and the selection logic is correct.
144-144: Approve thelook_message_by_offsetmethod.The method correctly handles looking up a message by its offset. Ensure that the offset is validated and the lookup logic is correct.
146-150: Approve thelook_message_by_offset_with_sizemethod.The method correctly handles looking up a message by its offset and size. Ensure that the offset and size are validated and the lookup logic is correct.
rocketmq-broker/src/processor/query_message_processor.rs (5)
32-35: Approve theQueryMessageProcessorstruct.The struct is correctly defined and the fields are properly initialized. Ensure that the generic type
MSis correctly constrained.
38-43: Approve thenewmethod.The method correctly initializes a new instance of
QueryMessageProcessorwith the providedmessage_store_configandmessage_store.
50-62: Approve theprocess_requestmethod.The method correctly handles processing requests based on the request code. Ensure that the request code is validated and the request processing logic is correct.
64-110: Approve thequery_messagemethod.The method correctly handles querying messages based on the request parameters. Ensure that the query parameters are validated and the query logic is correct.
113-138: Approve theview_message_by_idmethod.The method correctly handles viewing a message by its ID based on the request parameters. Ensure that the request parameters are validated and the view logic is correct.
rocketmq-remoting/src/protocol/header/query_message_request_header.rs (6)
24-38: Approve theQueryMessageRequestHeaderstruct.The struct is correctly defined and the fields are properly initialized. Ensure that the nested
TopicRequestHeaderis correctly handled.
55-75: Approve theto_mapmethod.The method correctly handles converting the struct to a map. Ensure that all fields are included in the map and the conversion logic is correct.
78-91: Approve thefrommethod.The method correctly handles creating an instance of the struct from a map. Ensure that all required fields are present in the map and the creation logic is correct.
100-115: Approve thecreating_from_map_with_all_fields_populates_struct_correctlytest.The test correctly verifies creating an instance of the struct from a map with all fields populated. Ensure that all field values are correctly checked.
118-131: Approve thecreating_from_map_missing_optional_fields_still_succeedstest.The test correctly verifies creating an instance of the struct from a map with missing optional fields. Ensure that all required fields are correctly checked.
133-143: Approve thecreating_from_map_with_invalid_number_fields_returns_nonetest.The test correctly verifies creating an instance of the struct from a map with invalid number fields. Ensure that all invalid field values are correctly checked.
rocketmq-store/src/index/index_service.rs (1)
134-137: Good use of&strfor parameters.Changing
topicandkeyto&strimproves performance by avoiding unnecessary allocations.rocketmq-broker/src/processor.rs (1)
165-169: Good addition for handling new request codes.The handling of
RequestCode::QueryMessageandRequestCode::ViewMessageByIdis correctly added.rocketmq-broker/src/broker_runtime.rs (1)
434-435: Correct initialization ofQueryMessageProcessor.The
QueryMessageProcessoris correctly initialized with the necessary configurations.rocketmq-store/src/message_store/default_message_store.rs (2)
1158-1164: LGTM!The method is correctly implemented.
1175-1190: LGTM!The method is correctly implemented.
| if let Some(value) = self.rpc_request_header.as_ref() { | ||
| if let Some(rpc_map) = value.to_map() { | ||
| map.extend(rpc_map); | ||
| } |
There was a problem hiding this comment.
Improve error handling in to_map method.
Consider adding error handling for potential issues when converting rpc_request_header to a map. This can help prevent unexpected behavior if the conversion fails.
if let Some(value) = self.rpc_request_header.as_ref() {
if let Some(rpc_map) = value.to_map() {
map.extend(rpc_map);
} else {
// Handle the error case where rpc_map is None
// e.g., log an error or return an error result
}
}| async fn query_message( | ||
| &self, | ||
| topic: &str, | ||
| key: &str, | ||
| max_num: i32, | ||
| begin_timestamp: i64, | ||
| end_timestamp: i64, | ||
| ) -> Option<QueryMessageResult> { | ||
| let mut query_message_result = QueryMessageResult::default(); | ||
| let mut last_query_msg_time = end_timestamp; | ||
| for i in 1..3 { | ||
| let mut query_offset_result = self.index_service.query_offset( | ||
| topic, | ||
| key, | ||
| max_num, | ||
| begin_timestamp, | ||
| end_timestamp, | ||
| ); | ||
| if query_offset_result.get_phy_offsets().is_empty() { | ||
| break; | ||
| } | ||
|
|
||
| query_offset_result.get_phy_offsets_mut().sort(); | ||
|
|
||
| query_message_result.index_last_update_timestamp = | ||
| query_offset_result.get_index_last_update_timestamp(); | ||
| query_message_result.index_last_update_phyoffset = | ||
| query_offset_result.get_index_last_update_phyoffset(); | ||
| let phy_offsets = query_offset_result.get_phy_offsets(); | ||
| for m in 0..phy_offsets.len() { | ||
| let offset = *phy_offsets.get(m).unwrap(); | ||
| let msg = self.look_message_by_offset(offset); | ||
| if m == 0 { | ||
| last_query_msg_time = msg.as_ref().unwrap().store_timestamp; | ||
| } | ||
| let result = self.commit_log.get_data_with_option(offset, false); | ||
| if let Some(sbr) = result { | ||
| query_message_result.add_message(sbr); | ||
| } | ||
| } | ||
| if query_message_result.buffer_total_size > 0 { | ||
| break; | ||
| } | ||
| if last_query_msg_time < begin_timestamp { | ||
| break; | ||
| } | ||
| } | ||
|
|
||
| Some(query_message_result) | ||
| } |
There was a problem hiding this comment.
Improve error handling and justify loop limit.
- Error handling for
look_message_by_offsetandget_data_with_optionshould be improved to handle cases where these methods returnNone. - The loop limit of 3 iterations seems arbitrary. Consider making this limit configurable or providing a justification for this choice.
for i in 1..3 {
let mut query_offset_result = self.index_service.query_offset(
topic,
key,
max_num,
begin_timestamp,
end_timestamp,
);
if query_offset_result.get_phy_offsets().is_empty() {
break;
}
query_offset_result.get_phy_offsets_mut().sort();
query_message_result.index_last_update_timestamp =
query_offset_result.get_index_last_update_timestamp();
query_message_result.index_last_update_phyoffset =
query_offset_result.get_index_last_update_phyoffset();
let phy_offsets = query_offset_result.get_phy_offsets();
for m in 0..phy_offsets.len() {
let offset = *phy_offsets.get(m).unwrap();
let msg = self.look_message_by_offset(offset);
if msg.is_none() {
continue; // Handle None case
}
if m == 0 {
last_query_msg_time = msg.as_ref().unwrap().store_timestamp;
}
let result = self.commit_log.get_data_with_option(offset, false);
if let Some(sbr) = result {
query_message_result.add_message(sbr);
}
}
if query_message_result.buffer_total_size > 0 {
break;
}
if last_query_msg_time < begin_timestamp {
break;
}
}Improve error handling and justify loop limit.
- Error handling for
look_message_by_offsetandget_data_with_optionshould be improved to handle cases where these methods returnNone. - The loop limit of 3 iterations seems arbitrary. Consider making this limit configurable or providing a justification for this choice.
for i in 1..3 {
let mut query_offset_result = self.index_service.query_offset(
topic,
key,
max_num,
begin_timestamp,
end_timestamp,
);
if query_offset_result.get_phy_offsets().is_empty() {
break;
}
query_offset_result.get_phy_offsets_mut().sort();
query_message_result.index_last_update_timestamp =
query_offset_result.get_index_last_update_timestamp();
query_message_result.index_last_update_phyoffset =
query_offset_result.get_index_last_update_phyoffset();
let phy_offsets = query_offset_result.get_phy_offsets();
for m in 0..phy_offsets.len() {
let offset = *phy_offsets.get(m).unwrap();
let msg = self.look_message_by_offset(offset);
if msg.is_none() {
continue; // Handle None case
}
if m == 0 {
last_query_msg_time = msg.as_ref().unwrap().store_timestamp;
}
let result = self.commit_log.get_data_with_option(offset, false);
if let Some(sbr) = result {
query_message_result.add_message(sbr);
}
}
if query_message_result.buffer_total_size > 0 {
break;
}
if last_query_msg_time < begin_timestamp {
break;
}
}| async fn select_one_message_by_offset( | ||
| &self, | ||
| commit_log_offset: i64, | ||
| ) -> Option<SelectMappedBufferResult> { | ||
| let sbr = self.commit_log.get_message(commit_log_offset, 4); | ||
| if let Some(sbr) = sbr { | ||
| let size = sbr.get_buffer().get_i32(); | ||
| self.commit_log.get_message(commit_log_offset, size) | ||
| } else { | ||
| None | ||
| } | ||
| } |
There was a problem hiding this comment.
Optimize message retrieval.
Fetching the message size separately and then fetching the complete message can be optimized by combining these operations.
async fn select_one_message_by_offset(
&self,
commit_log_offset: i64,
) -> Option<SelectMappedBufferResult> {
if let Some(sbr) = self.commit_log.get_message(commit_log_offset, 4) {
let size = sbr.get_buffer().get_i32();
self.commit_log.get_message(commit_log_offset, size)
} else {
None
}
}| fn look_message_by_offset(&self, commit_log_offset: i64) -> Option<MessageExt> { | ||
| if let Some(sbr) = self.commit_log.get_message(commit_log_offset, 4) { | ||
| let size = sbr.get_buffer().get_i32(); | ||
| self.look_message_by_offset_with_size(commit_log_offset, size) | ||
| } else { | ||
| None | ||
| } | ||
| } |
There was a problem hiding this comment.
Optimize message retrieval.
Fetching the message size separately and then fetching the complete message can be optimized by combining these operations.
fn look_message_by_offset(&self, commit_log_offset: i64) -> Option<MessageExt> {
if let Some(sbr) = self.commit_log.get_message(commit_log_offset, 4) {
let size = sbr.get_buffer().get_i32();
self.look_message_by_offset_with_size(commit_log_offset, size)
} else {
None
}
}
Which Issue(s) This PR Fixes(Closes)
Fixes #790
Brief Description
How Did You Test This Change?
Summary by CodeRabbit
New Features
QueryMessageProcessorto enhance message querying and viewing functionalities.Enhancements
QueryMessageProcessorwith generic types.QueryOffsetResultto include mutable access to physical offsets.Bug Fixes
query_offsetto improve query efficiency.Code Improvements