[ISSUE #1546]🚀Rocketmq-rust broker supports RequestCode ConsumerSendMsgBack(36)🔥#1547
[ISSUE #1546]🚀Rocketmq-rust broker supports RequestCode ConsumerSendMsgBack(36)🔥#1547rocketmq-rust-bot merged 2 commits intomainfrom
Conversation
WalkthroughThe changes in this pull request introduce enhancements to the Changes
Assessment against linked issues
Possibly related issues
Possibly related PRs
Suggested reviewers
Poem
Thank you for using CodeRabbit. We offer it for free to the OSS community and would appreciate your support in helping us grow. If you find it useful, would you consider giving us a shout-out on your favorite social media? 🪧 TipsChatThere are 3 ways to chat with CodeRabbit:
Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments. CodeRabbit Commands (Invoked using PR comments)
Other keywords and placeholders
Documentation and Community
|
|
🔊@mxsm 🚀Thanks for your contribution 🎉. CodeRabbit(AI) will review your code first 🔥 |
There was a problem hiding this comment.
Actionable comments posted: 5
🧹 Outside diff range and nitpick comments (2)
rocketmq-broker/src/processor/send_message_processor.rs (2)
43-43: Review the necessity of the added import forTopicSysFlagAn import statement for
TopicSysFlaghas been added. Please verify thatTopicSysFlagis utilized within the code to justify this import. Unused imports can clutter the code and should be removed if not needed.
49-49: Confirm usage ofMessageAccessorafter adding its importThe import
MessageAccessor::MessageAccessorhas been added. Ensure that methods fromMessageAccessorare actually used in the code. If not, consider removing the import to keep the code clean.
📜 Review details
Configuration used: .coderabbit.yaml
Review profile: CHILL
📒 Files selected for processing (2)
rocketmq-broker/src/processor/reply_message_processor.rs(2 hunks)rocketmq-broker/src/processor/send_message_processor.rs(11 hunks)
🔇 Additional comments (8)
rocketmq-broker/src/processor/send_message_processor.rs (6)
59-59: Addition of ConsumerSendMsgBackRequestHeader import
The import for ConsumerSendMsgBackRequestHeader has been added, indicating new functionality related to message consumption. Verify that this header is correctly used in handling consumer send-back messages.
71-71: Utilize the imported PutMessageStatus effectively
The PutMessageStatus is imported for handling message storage results. Ensure all possible statuses are handled appropriately in the code to prevent unexpected behavior.
205-205: Add consume_message_hook_vec to the Inner struct
The field consume_message_hook_vec has been added to the Inner struct. Ensure it is correctly initialized and managed, and that all access to it is thread-safe.
214-214: Initialize store_host in Inner struct
The store_host field is now part of Inner. Verify that store_host is initialized correctly and used consistently across the codebase for accurate message metadata.
980-980: Correct initialization of store_host in Inner struct
The store_host field is initialized. Confirm that the socket address is correctly parsed and that error handling is in place if parsing fails.
1004-1009: Verify execution of consume message hooks
In the execute_consume_message_hook_after method, hooks are iterated and executed. Ensure that the hooks modify the context as intended and that any exceptions within hooks are properly handled to prevent unexpected termination.
rocketmq-broker/src/processor/reply_message_processor.rs (2)
85-85: Include consume_message_hook_vec in Inner struct
The consume_message_hook_vec field has been added to the Inner struct. Ensure that any consume message hooks are properly managed and invoked within the ReplyMessageProcessor.
94-94: Initialization of store_host in Inner struct
The store_host is now part of the Inner struct. Verify that this address is correctly assigned and used throughout the message processing, specifically for accurate message metadata.
| use crate::client::manager::producer_manager::ProducerManager; | ||
| use crate::client::net::broker_to_client::Broker2Client; | ||
| use crate::client::rebalance::rebalance_lock_manager::RebalanceLockManager; | ||
| use crate::mqtrace::consume_message_context::ConsumeMessageContext; |
There was a problem hiding this comment.
🛠️ Refactor suggestion
Implement ConsumeMessageContext and ConsumeMessageHook correctly
New imports for ConsumeMessageContext and ConsumeMessageHook suggest additions to message consumption hooks. Confirm that these hooks are properly integrated and invoked at the correct points in the message processing flow.
Consider adding documentation or comments explaining how these hooks are intended to be used, which can aid future maintenance.
| rocketmq_store::base::message_status_enum::PutMessageStatus::PutOk => { | ||
| send_ok = true; | ||
| response.set_code_ref(RemotingSysResponseCode::Success); | ||
| }, | ||
| rocketmq_store::base::message_status_enum::PutMessageStatus::FlushDiskTimeout => { | ||
| send_ok = true; | ||
| response.set_code_ref(ResponseCode::FlushDiskTimeout); | ||
| }, | ||
| rocketmq_store::base::message_status_enum::PutMessageStatus::FlushSlaveTimeout => { | ||
| send_ok = true; | ||
| response.set_code_ref(ResponseCode::FlushSlaveTimeout); | ||
| }, | ||
| rocketmq_store::base::message_status_enum::PutMessageStatus::SlaveNotAvailable =>{ | ||
| send_ok = true; | ||
| response.set_code_ref(ResponseCode::SlaveNotAvailable); | ||
| }, | ||
| rocketmq_store::base::message_status_enum::PutMessageStatus::ServiceNotAvailable => { | ||
| response.set_code_mut(ResponseCode::ServiceNotAvailable).set_remark_mut("service not available now. It may be caused by one of the following reasons: \ | ||
| the broker's disk is full %s, messages are put to the slave, message store has been shut down, etc."); | ||
| }, | ||
| rocketmq_store::base::message_status_enum::PutMessageStatus::CreateMappedFileFailed => { | ||
| response.set_code_mut(RemotingSysResponseCode::SystemError).set_remark_mut("create mapped file failed, remoting_server is busy or broken."); | ||
| }, | ||
| rocketmq_store::base::message_status_enum::PutMessageStatus::MessageIllegal | | ||
| rocketmq_store::base::message_status_enum::PutMessageStatus::PropertiesSizeExceeded => { | ||
| response.set_code_mut(ResponseCode::MessageIllegal).set_remark_mut("the message is illegal, maybe msg body or properties length not matched. msg body length limit B, msg properties length limit 32KB."); | ||
| }, | ||
| rocketmq_store::base::message_status_enum::PutMessageStatus::OsPageCacheBusy =>{ | ||
| response.set_code_mut(RemotingSysResponseCode::SystemError).set_remark_mut("[PC_SYNCHRONIZED]broker busy, start flow control for a while"); | ||
| }, | ||
| rocketmq_store::base::message_status_enum::PutMessageStatus::UnknownError => { | ||
| response.set_code_mut(RemotingSysResponseCode::SystemError).set_remark_mut("UNKNOWN_ERROR"); | ||
| }, | ||
| rocketmq_store::base::message_status_enum::PutMessageStatus::InSyncReplicasNotEnough => { | ||
| response.set_code_mut(RemotingSysResponseCode::SystemError).set_remark_mut("in-sync replicas not enough"); | ||
| }, | ||
| rocketmq_store::base::message_status_enum::PutMessageStatus::LmqConsumeQueueNumExceeded => { | ||
| response.set_code_mut(RemotingSysResponseCode::SystemError).set_remark_mut("[LMQ_CONSUME_QUEUE_NUM_EXCEEDED]broker config enableLmq and enableMultiDispatch, lmq consumeQueue num exceed maxLmqConsumeQueueNum config num, default limit 2w."); | ||
| }, | ||
| rocketmq_store::base::message_status_enum::PutMessageStatus::WheelTimerFlowControl => { | ||
| response.set_code_mut(RemotingSysResponseCode::SystemError).set_remark_mut("timer message is under flow control, max num limit is %d or the current value is greater than %d and less than %d, trigger random flow control"); | ||
| }, | ||
| rocketmq_store::base::message_status_enum::PutMessageStatus::WheelTimerMsgIllegal => { | ||
| response.set_code_mut(ResponseCode::MessageIllegal).set_remark_mut("timer message illegal, the delay time should not be bigger than the max delay %dms; or if set del msg, the delay time should be bigger than the current time"); | ||
| }, | ||
| rocketmq_store::base::message_status_enum::PutMessageStatus::WheelTimerNotEnable => { | ||
| response.set_code_mut(RemotingSysResponseCode::SystemError).set_remark_mut("accurate timer message is not enabled, timerWheelEnable is %s"); | ||
| }, | ||
| _ => { | ||
| response.set_code_mut(RemotingSysResponseCode::SystemError).set_remark_mut("UNKNOWN_ERROR DEFAULT"); | ||
| } | ||
| } |
There was a problem hiding this comment.
Ensure comprehensive handling of all PutMessageStatus variants
An extensive match block handles different PutMessageStatus cases. Double-check that every status is accounted for and that the response codes and remarks accurately reflect the nature of each status.
It's vital to provide clear and user-friendly error messages. Consider reviewing the messages for clarity and consistency.
| pub(crate) struct Inner<MS, TS> { | ||
| pub(crate) topic_config_manager: TopicConfigManager, | ||
| pub(crate) send_message_hook_vec: ArcMut<Vec<Box<dyn SendMessageHook>>>, | ||
| pub(crate) consume_message_hook_vec: ArcMut<Vec<Box<dyn ConsumeMessageHook>>>, |
There was a problem hiding this comment.
Ensure thread-safe access to consume_message_hook_vec
The consume_message_hook_vec field is accessed within potentially concurrent contexts. Verify that all accesses are properly synchronized to prevent data races or other concurrency issues.
If ArcMut<Vec<Box<dyn ConsumeMessageHook>>> does not ensure thread safety, consider wrapping it in mutexes or using concurrent data structures.
| impl<MS, TS> Inner<MS, TS> | ||
| where | ||
| MS: MessageStore, | ||
| TS: TransactionalMessageService, | ||
| { | ||
| #[inline] | ||
| pub fn has_send_message_hook(&self) -> bool { | ||
| self.send_message_hook_vec.is_empty() | ||
| } | ||
|
|
||
| #[inline] | ||
| pub fn has_consume_message_hook(&self) -> bool { | ||
| self.consume_message_hook_vec.is_empty() | ||
| } | ||
|
|
There was a problem hiding this comment.
Fix logic in has_consume_message_hook method
The has_consume_message_hook method currently returns true when consume_message_hook_vec is empty, which is counterintuitive and may lead to incorrect behavior.
Apply this diff to correct the logic:
#[inline]
pub fn has_consume_message_hook(&self) -> bool {
- self.consume_message_hook_vec.is_empty()
+ !self.consume_message_hook_vec.is_empty()
}This change ensures that has_consume_message_hook returns true when hooks are present, aligning with expected behavior.
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| impl<MS, TS> Inner<MS, TS> | |
| where | |
| MS: MessageStore, | |
| TS: TransactionalMessageService, | |
| { | |
| #[inline] | |
| pub fn has_send_message_hook(&self) -> bool { | |
| self.send_message_hook_vec.is_empty() | |
| } | |
| #[inline] | |
| pub fn has_consume_message_hook(&self) -> bool { | |
| self.consume_message_hook_vec.is_empty() | |
| } | |
| impl<MS, TS> Inner<MS, TS> | |
| where | |
| MS: MessageStore, | |
| TS: TransactionalMessageService, | |
| { | |
| #[inline] | |
| pub fn has_send_message_hook(&self) -> bool { | |
| self.send_message_hook_vec.is_empty() | |
| } | |
| #[inline] | |
| pub fn has_consume_message_hook(&self) -> bool { | |
| !self.consume_message_hook_vec.is_empty() | |
| } |
|
🔊@mxsm 🚀Thanks for your contribution 🎉. CodeRabbit(AI) will review your code first 🔥 |
There was a problem hiding this comment.
Actionable comments posted: 1
🧹 Outside diff range and nitpick comments (2)
rocketmq-broker/src/processor/send_message_processor.rs (2)
1229-1230: Address TODO comments for schedule topic and DLQ handlingThe implementation has unfinished TODO comments for schedule topic and DLQ handling. These need to be implemented for complete functionality.
Would you like me to help implement these missing features or create GitHub issues to track them?
Also applies to: 1233-1234
1218-1245: Enhance error handling in put_message_result matchThe error handling could be more informative. Consider:
- Adding specific error messages for each PutMessageStatus variant
- Including more context in error responses
_ => ( RemotingCommand::create_response_command_with_code_remark( ResponseCode::SystemError, - put_message_result.put_message_status().to_string(), + format!("Failed to put message back: {}, topic: {}", + put_message_result.put_message_status(), + inner_topic), ), false, ),
📜 Review details
Configuration used: .coderabbit.yaml
Review profile: CHILL
📒 Files selected for processing (1)
rocketmq-broker/src/processor/send_message_processor.rs(11 hunks)
🔇 Additional comments (3)
rocketmq-broker/src/processor/send_message_processor.rs (3)
993-996: LGTM: Correct implementation of has_consume_message_hook
The implementation correctly checks if the hook vector is non-empty.
642-693: LGTM: Comprehensive handling of PutMessageStatus variants
The match statement thoroughly handles all possible message status variants with appropriate response codes and remarks.
970-970: Ensure thread-safe access to consume_message_hook_vec
The hook vector is wrapped in ArcMut which provides interior mutability, but consider documenting the thread-safety guarantees and any synchronization requirements.
✅ Verification successful
Let me analyze the access patterns shown in the results. I can see that:
consume_message_hook_vecis initialized withArcMut::new(Vec::new())- It's only accessed in two ways:
is_empty()check inhas_consume_message_hookiter()inexecute_consume_message_hook_after
Let me verify if there are any mutation points by checking for any add/remove operations on the vector.
Let me try a different approach to find any mutations using ripgrep:
Based on the evidence gathered, I can now generate the final response:
Thread safety is properly handled for consume_message_hook_vec
The consume_message_hook_vec implementation is thread-safe as:
- It's wrapped in
ArcMutfor safe shared ownership - Only read-only operations are performed (
is_empty()anditer()) - No mutations are observed after initialization
- Follows the same pattern as other hook implementations in the codebase
🏁 Scripts executed
The following scripts were executed for the analysis:
Script:
#!/bin/bash
# Check for any concurrent access patterns to consume_message_hook_vec
rg -A 5 "consume_message_hook_vec"
Length of output: 3091
Script:
#!/bin/bash
# Check for any mutations to consume_message_hook_vec
ast-grep --pattern 'consume_message_hook_vec.$_'
Length of output: 50
Script:
#!/bin/bash
# Search for any methods that might modify the hook vector
rg -B 3 -A 3 "fn.*consume.*hook"
Length of output: 3175
| pub(crate) async fn consumer_send_msg_back( | ||
| &mut self, | ||
| _channel: &Channel, | ||
| _ctx: &ConnectionHandlerContext, | ||
| _request: &RemotingCommand, | ||
| request: &RemotingCommand, | ||
| ) -> crate::Result<Option<RemotingCommand>> { | ||
| todo!() | ||
| let request_header = request | ||
| .decode_command_custom_header::<ConsumerSendMsgBackRequestHeader>() | ||
| .map_err(|e| RemotingCommandError(e.to_string()))?; | ||
| if self.broker_config.broker_identity.broker_id != mix_all::MASTER_ID { | ||
| return Ok(Some( | ||
| RemotingCommand::create_response_command_with_code_remark( | ||
| ResponseCode::SystemError, | ||
| format!( | ||
| "no master available along with {}", | ||
| self.broker_config.broker_ip1 | ||
| ), | ||
| ), | ||
| )); | ||
| } | ||
| let subscription_group_config = self | ||
| .subscription_group_manager | ||
| .find_subscription_group_config(&request_header.group); | ||
| if subscription_group_config.is_none() { | ||
| return Ok(Some( | ||
| RemotingCommand::create_response_command_with_code_remark( | ||
| ResponseCode::SubscriptionNotExist, | ||
| format!( | ||
| "subscription group not exist, {} {}", | ||
| request_header.group, | ||
| FAQUrl::suggest_todo(FAQUrl::SUBSCRIPTION_GROUP_NOT_EXIST) | ||
| ), | ||
| ), | ||
| )); | ||
| } | ||
|
|
||
| if !PermName::is_writeable(self.broker_config.broker_permission) { | ||
| return Ok(Some( | ||
| RemotingCommand::create_response_command_with_code_remark( | ||
| ResponseCode::NoPermission, | ||
| format!( | ||
| "the broker[{}-{}] sending message is forbidden", | ||
| self.broker_config.broker_identity.broker_name, | ||
| self.broker_config.broker_ip1 | ||
| ), | ||
| ), | ||
| )); | ||
| } | ||
|
|
||
| let subscription_group_config = subscription_group_config.unwrap(); | ||
| if subscription_group_config.retry_queue_nums() <= 0 { | ||
| return Ok(Some(RemotingCommand::create_remoting_command( | ||
| ResponseCode::Success, | ||
| ))); | ||
| } | ||
| let mut new_topic = | ||
| CheetahString::from_string(mix_all::get_retry_topic(request_header.group.as_str())); | ||
| let mut queue_id_int = | ||
| rand::thread_rng().gen_range(0..subscription_group_config.retry_queue_nums()); | ||
| let topic_sys_flag = if request_header.unit_mode { | ||
| TopicSysFlag::build_sys_flag(false, true) | ||
| } else { | ||
| 0 | ||
| }; | ||
| let topic_config = self | ||
| .topic_config_manager | ||
| .create_topic_in_send_message_back_method( | ||
| &new_topic, | ||
| subscription_group_config.retry_queue_nums(), | ||
| PermName::PERM_WRITE | PermName::PERM_READ, | ||
| false, | ||
| topic_sys_flag, | ||
| ); | ||
| if topic_config.is_none() { | ||
| return Ok(Some( | ||
| RemotingCommand::create_response_command_with_code_remark( | ||
| ResponseCode::SystemError, | ||
| format!("topic {} not exist", new_topic), | ||
| ), | ||
| )); | ||
| } | ||
| let topic_config = topic_config.unwrap(); | ||
| if !PermName::is_writeable(topic_config.perm) { | ||
| return Ok(Some( | ||
| RemotingCommand::create_response_command_with_code_remark( | ||
| ResponseCode::NoPermission, | ||
| format!("the topic[{}] sending message is forbidden", new_topic), | ||
| ), | ||
| )); | ||
| } | ||
| let msg_ext = self | ||
| .message_store | ||
| .look_message_by_offset(request_header.offset); | ||
| if msg_ext.is_none() { | ||
| return Ok(Some( | ||
| RemotingCommand::create_response_command_with_code_remark( | ||
| ResponseCode::SystemError, | ||
| format!( | ||
| "look message by offset failed, the offset is {}", | ||
| request_header.offset | ||
| ), | ||
| ), | ||
| )); | ||
| } | ||
|
|
||
| let mut msg_ext = msg_ext.unwrap(); | ||
| let retry_topic = msg_ext.get_property(&CheetahString::from_static_str( | ||
| MessageConst::PROPERTY_RETRY_TOPIC, | ||
| )); | ||
| if retry_topic.is_none() { | ||
| let topic = msg_ext.get_topic().clone(); | ||
| MessageAccessor::put_property( | ||
| &mut msg_ext, | ||
| CheetahString::from_static_str(MessageConst::PROPERTY_RETRY_TOPIC), | ||
| topic, | ||
| ); | ||
| } | ||
| msg_ext.set_wait_store_msg_ok(false); | ||
| let mut delay_level = request_header.delay_level; | ||
| let mut max_reconsume_times = subscription_group_config.retry_max_times(); | ||
| if request.version() >= RocketMqVersion::V349 as i32 { | ||
| if let Some(num) = request_header.max_reconsume_times { | ||
| max_reconsume_times = num; | ||
| } | ||
| } | ||
|
|
||
| let is_dlq = if msg_ext.reconsume_times >= max_reconsume_times || delay_level < 0 { | ||
| new_topic = CheetahString::from_string(mix_all::get_dlq_topic(&request_header.group)); | ||
| queue_id_int = 0; | ||
| let topic_config_inner = self | ||
| .topic_config_manager | ||
| .create_topic_in_send_message_back_method( | ||
| &new_topic, | ||
| DLQ_NUMS_PER_GROUP as i32, | ||
| PermName::PERM_WRITE | PermName::PERM_READ, | ||
| false, | ||
| 0, | ||
| ); | ||
| if topic_config_inner.is_none() { | ||
| return Ok(Some( | ||
| RemotingCommand::create_response_command_with_code_remark( | ||
| ResponseCode::SystemError, | ||
| format!("topic {} not exist", new_topic), | ||
| ), | ||
| )); | ||
| } | ||
| msg_ext.set_delay_time_level(0); | ||
| true | ||
| } else { | ||
| if 0 == delay_level { | ||
| delay_level = 3 + msg_ext.reconsume_times(); | ||
| } | ||
| msg_ext.set_delay_time_level(delay_level); | ||
| false | ||
| }; | ||
| let mut msg_inner = MessageExtBrokerInner::default(); | ||
| msg_inner.set_topic(new_topic); | ||
| if let Some(body) = msg_ext.get_body() { | ||
| msg_inner.set_body(body.clone()); | ||
| } | ||
| msg_inner.set_flag(msg_ext.get_flag()); | ||
| MessageAccessor::set_properties(&mut msg_inner, msg_ext.get_properties().clone()); | ||
| msg_inner.properties_string = message_properties_to_string(msg_ext.get_properties()); | ||
| msg_inner.tags_code = MessageExtBrokerInner::tags_string_to_tags_code( | ||
| msg_ext.get_tags().unwrap_or_default().as_str(), | ||
| ); | ||
| msg_inner.message_ext_inner.queue_id = queue_id_int; | ||
| msg_inner.message_ext_inner.sys_flag = msg_ext.sys_flag; | ||
| msg_inner.message_ext_inner.born_timestamp = msg_ext.born_timestamp; | ||
| msg_inner.message_ext_inner.born_host = msg_ext.born_host; | ||
| msg_inner.message_ext_inner.store_host = self.store_host; | ||
| msg_inner.message_ext_inner.reconsume_times = msg_ext.reconsume_times + 1; | ||
|
|
||
| let origin_msg_id = if let Some(id) = MessageAccessor::get_origin_message_id(&msg_ext) { | ||
| id | ||
| } else { | ||
| msg_ext.msg_id.clone() | ||
| }; | ||
| MessageAccessor::set_origin_message_id(&mut msg_inner, origin_msg_id); | ||
| msg_inner.properties_string = message_properties_to_string(msg_ext.get_properties()); | ||
|
|
||
| let inner_topic = msg_inner.get_topic().clone(); | ||
| let put_message_result = self.message_store.put_message(msg_inner).await; | ||
| let commercial_owner = request | ||
| .get_ext_fields() | ||
| .and_then(|value| value.get(BrokerStatsManager::COMMERCIAL_OWNER).cloned()); | ||
| let (response, succeeded) = match put_message_result.put_message_status() { | ||
| PutMessageStatus::PutOk => { | ||
| let mut _back_topic = msg_ext.get_topic().clone(); | ||
| let correct_topic = msg_ext.get_property(&CheetahString::from_static_str( | ||
| MessageConst::PROPERTY_RETRY_TOPIC, | ||
| )); | ||
| if let Some(topic) = correct_topic { | ||
| _back_topic = topic; | ||
| } | ||
|
|
||
| if TopicValidator::RMQ_SYS_SCHEDULE_TOPIC == inner_topic { | ||
| //TODO: implement this | ||
| } | ||
|
|
||
| if is_dlq { | ||
| // TODO: implement this | ||
| } | ||
| (RemotingCommand::create_response_command(), true) | ||
| } | ||
|
|
||
| _ => ( | ||
| RemotingCommand::create_response_command_with_code_remark( | ||
| ResponseCode::SystemError, | ||
| put_message_result.put_message_status().to_string(), | ||
| ), | ||
| false, | ||
| ), | ||
| }; | ||
| if self.has_consume_message_hook() | ||
| && request_header | ||
| .origin_msg_id | ||
| .is_some_and(|ref id| !id.is_empty()) | ||
| { | ||
| let mut context = ConsumeMessageContext::default(); | ||
| let namespace = | ||
| NamespaceUtil::get_namespace_from_resource(request_header.group.as_str()); | ||
| context.namespace = CheetahString::from_string(namespace); | ||
| if let Some(ref topic) = request_header.origin_topic { | ||
| context.topic = topic.clone(); | ||
| } | ||
| context.consumer_group = request_header.group.clone(); | ||
| context.commercial_rcv_stats = StatsType::SendBack; | ||
| context.commercial_rcv_times = 1; | ||
| context.commercial_owner = commercial_owner; | ||
| context.account_auth_type = request | ||
| .get_ext_fields() | ||
| .and_then(|value| value.get(BrokerStatsManager::ACCOUNT_AUTH_TYPE).cloned()); | ||
| context.account_owner_parent = request | ||
| .get_ext_fields() | ||
| .and_then(|value| value.get(BrokerStatsManager::ACCOUNT_OWNER_PARENT).cloned()); | ||
| context.account_owner_self = request | ||
| .get_ext_fields() | ||
| .and_then(|value| value.get(BrokerStatsManager::ACCOUNT_OWNER_SELF).cloned()); | ||
| context.rcv_stat = if is_dlq { | ||
| StatsType::SendBackToDlq | ||
| } else { | ||
| StatsType::SendBack | ||
| }; | ||
| context.success = succeeded; | ||
| context.rcv_msg_num = 1; | ||
| context.rcv_msg_size = 0; | ||
| context.commercial_rcv_msg_num = if succeeded { 1 } else { 0 }; | ||
| self.execute_consume_message_hook_after(&mut context); | ||
| } | ||
|
|
||
| Ok(Some(response)) |
There was a problem hiding this comment.
🛠️ Refactor suggestion
Refactor consumer_send_msg_back for better maintainability
The method is lengthy and handles multiple concerns. Consider breaking it down into smaller, focused methods:
- Message validation logic (lines 1038-1079)
- DLQ handling logic (lines 1158-1186)
- Message preparation (lines 1187-1212)
- Hook execution (lines 1246-1281)
Codecov ReportAttention: Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## main #1547 +/- ##
==========================================
- Coverage 25.05% 24.95% -0.11%
==========================================
Files 450 450
Lines 59609 59851 +242
==========================================
Hits 14933 14933
- Misses 44676 44918 +242 ☔ View full report in Codecov by Sentry. |
Which Issue(s) This PR Fixes(Closes)
Fixes #1546
Brief Description
How Did You Test This Change?
Summary by CodeRabbit
New Features
Bug Fixes
Refactor