Conversation
WalkthroughThe changes optimize RocketMQ's message-pulling process. A new boolean flag Changes
Sequence DiagramsequenceDiagram
participant Client
participant BrokerRuntime
participant DefaultMessageStore
participant ReputMessageService
Note over Client: Enhanced Pull Message Flow
Client->>BrokerRuntime: Request messages
BrokerRuntime->>DefaultMessageStore: Fetch messages (with notify_message_arrive_in_batch)
DefaultMessageStore-->>ReputMessageService: Notify message arrival
ReputMessageService-->>DefaultMessageStore: Batch process based on notify_message_arrive_in_batch
DefaultMessageStore->>BrokerRuntime: Return messages
BrokerRuntime->>Client: Deliver messages
Assessment against linked issues
Poem
Tip AI model upgrade
|
|
🚀Thanks for your contribution🎉. CodeRabbit(AI) will review your code first🔥 |
There was a problem hiding this comment.
Actionable comments posted: 1
Review details
Configuration used: .coderabbit.yaml
Review profile: CHILL
Files selected for processing (3)
- rocketmq-broker/src/broker_runtime.rs (1 hunks)
- rocketmq-common/src/common/mix_all.rs (2 hunks)
- rocketmq-store/src/message_store/default_message_store.rs (10 hunks)
Additional comments not posted (14)
rocketmq-common/src/common/mix_all.rs (4)
104-109: Functionis_lmqlooks good.The function correctly checks if the provided metadata starts with the LMQ prefix. The use of
Optionis appropriate for handling possibleNonevalues.
178-182: Testreturns_true_for_lmq_prefixed_metadatalooks good.The test correctly verifies that the function returns
truefor metadata starting with the LMQ prefix.
184-188: Testreturns_false_for_non_lmq_prefixed_metadatalooks good.The test correctly verifies that the function returns
falsefor metadata not starting with the LMQ prefix.
190-193: Testreturns_false_for_none_metadatalooks good.The test correctly verifies that the function returns
falseforNonemetadata.rocketmq-broker/src/broker_runtime.rs (1)
298-298: Ensure the new boolean argument is correctly utilized.The new boolean argument
falsehas been added to theDefaultMessageStore::newcall. Ensure that this argument is correctly utilized within theDefaultMessageStoreimplementation.rocketmq-store/src/message_store/default_message_store.rs (9)
35-38: Imports are appropriate.The new imports are relevant and necessary for the added functionality.
116-116: New fieldnotify_message_arrive_in_batchadded toDefaultMessageStore.This field will likely be used to control batch notifications for message arrival.
144-144: Clone implementation updated to include new field.The
Cloneimplementation correctly includes the newnotify_message_arrive_in_batchfield.
155-155: Constructor updated to initialize new field.The
notify_message_arrive_in_batchfield is correctly initialized in the constructor.
585-586:startmethod updated to pass new field.The
notify_message_arrive_in_batchfield is correctly passed to theReputMessageService.
1037-1038: New fieldnotify_message_arrive_in_batchadded toReputMessageService.This field will likely be used to control batch notifications for message arrival.
1037-1038:startmethod updated to pass new field.The
notify_message_arrive_in_batchfield is correctly passed to theReputMessageServiceInner.Also applies to: 1045-1046
1106-1107: New fieldnotify_message_arrive_in_batchadded toReputMessageServiceInner.This field will likely be used to control batch notifications for message arrival.
1218-1221:do_reputmethod updated to use new field.The
notify_message_arrive_in_batchfield is correctly used to control message arrival notifications.
| fn notify_message_arrive4multi_queue(&self, dispatch_request: &mut DispatchRequest) { | ||
| let prop = dispatch_request.properties_map.as_ref(); | ||
| if prop.is_none() || dispatch_request.topic.starts_with(RETRY_GROUP_TOPIC_PREFIX) { | ||
| return; | ||
| } | ||
| let prop = prop.unwrap(); | ||
| let multi_dispatch_queue = prop.get(MessageConst::PROPERTY_INNER_MULTI_DISPATCH); | ||
| let multi_queue_offset = prop.get(MessageConst::PROPERTY_INNER_MULTI_QUEUE_OFFSET); | ||
| if multi_dispatch_queue.is_none() | ||
| || multi_queue_offset.is_none() | ||
| || multi_dispatch_queue.as_ref().unwrap().is_empty() | ||
| || multi_queue_offset.as_ref().unwrap().is_empty() | ||
| { | ||
| return; | ||
| } | ||
| let queues: Vec<&str> = multi_dispatch_queue | ||
| .unwrap() | ||
| .split(MULTI_DISPATCH_QUEUE_SPLITTER) | ||
| .collect(); | ||
| let queue_offsets: Vec<&str> = multi_queue_offset | ||
| .unwrap() | ||
| .split(MULTI_DISPATCH_QUEUE_SPLITTER) | ||
| .collect(); | ||
| if queues.len() != queue_offsets.len() { | ||
| return; | ||
| } | ||
| for i in 0..queues.len() { | ||
| let queue_name = queues[i]; | ||
| let queue_offset: i64 = queue_offsets[i].parse().unwrap(); | ||
| let mut queue_id = dispatch_request.queue_id; | ||
| if self.message_store_config.enable_lmq && is_lmq(Some(queue_name)) { | ||
| queue_id = 0; | ||
| } | ||
| self.message_store | ||
| .message_arriving_listener | ||
| .as_ref() | ||
| .unwrap() | ||
| .arriving( | ||
| queue_name, | ||
| queue_id, | ||
| queue_offset + 1, | ||
| Some(dispatch_request.tags_code), | ||
| dispatch_request.store_timestamp, | ||
| dispatch_request.bit_map.clone(), | ||
| dispatch_request.properties_map.as_ref(), | ||
| ); | ||
| } | ||
| } |
There was a problem hiding this comment.
Handle unwrap calls safely in notify_message_arrive4multi_queue.
The unwrap calls can potentially panic if the values are not present. Consider using safe handling methods like if let or match.
- let prop = prop.unwrap();
- let multi_dispatch_queue = prop.get(MessageConst::PROPERTY_INNER_MULTI_DISPATCH);
- let multi_queue_offset = prop.get(MessageConst::PROPERTY_INNER_MULTI_QUEUE_OFFSET);
+ if let Some(prop) = prop {
+ let multi_dispatch_queue = prop.get(MessageConst::PROPERTY_INNER_MULTI_DISPATCH);
+ let multi_queue_offset = prop.get(MessageConst::PROPERTY_INNER_MULTI_QUEUE_OFFSET);Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| fn notify_message_arrive4multi_queue(&self, dispatch_request: &mut DispatchRequest) { | |
| let prop = dispatch_request.properties_map.as_ref(); | |
| if prop.is_none() || dispatch_request.topic.starts_with(RETRY_GROUP_TOPIC_PREFIX) { | |
| return; | |
| } | |
| let prop = prop.unwrap(); | |
| let multi_dispatch_queue = prop.get(MessageConst::PROPERTY_INNER_MULTI_DISPATCH); | |
| let multi_queue_offset = prop.get(MessageConst::PROPERTY_INNER_MULTI_QUEUE_OFFSET); | |
| if multi_dispatch_queue.is_none() | |
| || multi_queue_offset.is_none() | |
| || multi_dispatch_queue.as_ref().unwrap().is_empty() | |
| || multi_queue_offset.as_ref().unwrap().is_empty() | |
| { | |
| return; | |
| } | |
| let queues: Vec<&str> = multi_dispatch_queue | |
| .unwrap() | |
| .split(MULTI_DISPATCH_QUEUE_SPLITTER) | |
| .collect(); | |
| let queue_offsets: Vec<&str> = multi_queue_offset | |
| .unwrap() | |
| .split(MULTI_DISPATCH_QUEUE_SPLITTER) | |
| .collect(); | |
| if queues.len() != queue_offsets.len() { | |
| return; | |
| } | |
| for i in 0..queues.len() { | |
| let queue_name = queues[i]; | |
| let queue_offset: i64 = queue_offsets[i].parse().unwrap(); | |
| let mut queue_id = dispatch_request.queue_id; | |
| if self.message_store_config.enable_lmq && is_lmq(Some(queue_name)) { | |
| queue_id = 0; | |
| } | |
| self.message_store | |
| .message_arriving_listener | |
| .as_ref() | |
| .unwrap() | |
| .arriving( | |
| queue_name, | |
| queue_id, | |
| queue_offset + 1, | |
| Some(dispatch_request.tags_code), | |
| dispatch_request.store_timestamp, | |
| dispatch_request.bit_map.clone(), | |
| dispatch_request.properties_map.as_ref(), | |
| ); | |
| } | |
| } | |
| fn notify_message_arrive4multi_queue(&self, dispatch_request: &mut DispatchRequest) { | |
| let prop = dispatch_request.properties_map.as_ref(); | |
| if prop.is_none() || dispatch_request.topic.starts_with(RETRY_GROUP_TOPIC_PREFIX) { | |
| return; | |
| } | |
| if let Some(prop) = prop { | |
| let multi_dispatch_queue = prop.get(MessageConst::PROPERTY_INNER_MULTI_DISPATCH); | |
| let multi_queue_offset = prop.get(MessageConst::PROPERTY_INNER_MULTI_QUEUE_OFFSET); | |
| if multi_dispatch_queue.is_none() | |
| || multi_queue_offset.is_none() | |
| || multi_dispatch_queue.as_ref().unwrap().is_empty() | |
| || multi_queue_offset.as_ref().unwrap().is_empty() | |
| { | |
| return; | |
| } | |
| let queues: Vec<&str> = multi_dispatch_queue | |
| .unwrap() | |
| .split(MULTI_DISPATCH_QUEUE_SPLITTER) | |
| .collect(); | |
| let queue_offsets: Vec<&str> = multi_queue_offset | |
| .unwrap() | |
| .split(MULTI_DISPATCH_QUEUE_SPLITTER) | |
| .collect(); | |
| if queues.len() != queue_offsets.len() { | |
| return; | |
| } | |
| for i in 0..queues.len() { | |
| let queue_name = queues[i]; | |
| let queue_offset: i64 = queue_offsets[i].parse().unwrap(); | |
| let mut queue_id = dispatch_request.queue_id; | |
| if self.message_store_config.enable_lmq && is_lmq(Some(queue_name)) { | |
| queue_id = 0; | |
| } | |
| self.message_store | |
| .message_arriving_listener | |
| .as_ref() | |
| .unwrap() | |
| .arriving( | |
| queue_name, | |
| queue_id, | |
| queue_offset + 1, | |
| Some(dispatch_request.tags_code), | |
| dispatch_request.store_timestamp, | |
| dispatch_request.bit_map.clone(), | |
| dispatch_request.properties_map.as_ref(), | |
| ); | |
| } | |
| } | |
| } |
Which Issue(s) This PR Fixes(Closes)
Fixes #729
Brief Description
How Did You Test This Change?
Summary by CodeRabbit
New Features
notify_message_arrive_in_batchto enhance message arrival notifications in batch mode.Improvements
DefaultMessageStoreto accommodate batch message notifications.ReputMessageServiceto support the new message notification feature.Bug Fixes