Skip to content

[ISSUE #1546]🚀Rocketmq-rust broker supports RequestCode ConsumerSendMsgBack(36)🔥#1547

Merged
rocketmq-rust-bot merged 2 commits intomainfrom
feature-1546
Dec 4, 2024
Merged

[ISSUE #1546]🚀Rocketmq-rust broker supports RequestCode ConsumerSendMsgBack(36)🔥#1547
rocketmq-rust-bot merged 2 commits intomainfrom
feature-1546

Conversation

@mxsm
Copy link
Copy Markdown
Owner

@mxsm mxsm commented Dec 4, 2024

Which Issue(s) This PR Fixes(Closes)

Fixes #1546

Brief Description

How Did You Test This Change?

Summary by CodeRabbit

  • New Features

    • Enhanced message processing capabilities with the addition of hooks for post-consumption actions.
    • Support for asynchronous message backtracking and improved handling of retries and dead-letter queues.
  • Bug Fixes

    • Improved validation logic for broker permissions and subscription groups during message processing.
  • Refactor

    • Streamlined response handling in message put operations for better clarity and efficiency.

@coderabbitai
Copy link
Copy Markdown
Contributor

coderabbitai bot commented Dec 4, 2024

Walkthrough

The changes in this pull request introduce enhancements to the ReplyMessageProcessor and SendMessageProcessor within the RocketMQ broker. A new vector, consume_message_hook_vec, is added to both processors, allowing for the registration of hooks related to message consumption. The Inner struct in both files has been updated to include this vector and a store_host field. Additionally, the consumer_send_msg_back method in SendMessageProcessor has been modified to be asynchronous, improving message backtracking and error handling capabilities.

Changes

File Path Change Summary
rocketmq-broker/src/processor/reply_message_processor.rs - Added consume_message_hook_vec: ArcMut<Vec<()>> and store_host: SocketAddr to Inner struct.
- Updated process_reply_message_request to include store_host in msg_inner.
rocketmq-broker/src/processor/send_message_processor.rs - Added consume_message_hook_vec: ArcMut<Vec<Box<dyn ConsumeMessageHook>>> to Inner struct.
- Updated consumer_send_msg_back to be asynchronous.
- Added methods: has_consume_message_hook and execute_consume_message_hook_after.
- Refactored handle_put_message_result.

Assessment against linked issues

Objective Addressed Explanation
Support for RequestCode ConsumerSendMsgBack (36) (#1546)

Possibly related issues

Possibly related PRs

Suggested reviewers

  • SpaceXCN
  • TeslaRustor

Poem

In the land of code where messages flow,
New hooks are added, watch them grow!
With hosts and vectors, we dance and play,
Enhancing our broker in a clever way.
So let’s hop along, with joy in our hearts,
For every change, a brand new start! 🐇✨


Thank you for using CodeRabbit. We offer it for free to the OSS community and would appreciate your support in helping us grow. If you find it useful, would you consider giving us a shout-out on your favorite social media?

❤️ Share
🪧 Tips

Chat

There are 3 ways to chat with CodeRabbit:

  • Review comments: Directly reply to a review comment made by CodeRabbit. Example:
    • I pushed a fix in commit <commit_id>, please review it.
    • Generate unit testing code for this file.
    • Open a follow-up GitHub issue for this discussion.
  • Files and specific lines of code (under the "Files changed" tab): Tag @coderabbitai in a new review comment at the desired location with your query. Examples:
    • @coderabbitai generate unit testing code for this file.
    • @coderabbitai modularize this function.
  • PR comments: Tag @coderabbitai in a new PR comment to ask questions about the PR branch. For the best results, please provide a very specific query, as very limited context is provided in this mode. Examples:
    • @coderabbitai gather interesting stats about this repository and render them as a table. Additionally, render a pie chart showing the language distribution in the codebase.
    • @coderabbitai read src/utils.ts and generate unit testing code.
    • @coderabbitai read the files in the src/scheduler package and generate a class diagram using mermaid and a README in the markdown format.
    • @coderabbitai help me debug CodeRabbit configuration file.

Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments.

CodeRabbit Commands (Invoked using PR comments)

  • @coderabbitai pause to pause the reviews on a PR.
  • @coderabbitai resume to resume the paused reviews.
  • @coderabbitai review to trigger an incremental review. This is useful when automatic reviews are disabled for the repository.
  • @coderabbitai full review to do a full review from scratch and review all the files again.
  • @coderabbitai summary to regenerate the summary of the PR.
  • @coderabbitai resolve resolve all the CodeRabbit review comments.
  • @coderabbitai configuration to show the current CodeRabbit configuration for the repository.
  • @coderabbitai help to get help.

Other keywords and placeholders

  • Add @coderabbitai ignore anywhere in the PR description to prevent this PR from being reviewed.
  • Add @coderabbitai summary to generate the high-level summary at a specific location in the PR description.
  • Add @coderabbitai anywhere in the PR title to generate the title automatically.

Documentation and Community

  • Visit our Documentation for detailed information on how to use CodeRabbit.
  • Join our Discord Community to get help, request features, and share feedback.
  • Follow us on X/Twitter for updates and announcements.

@rocketmq-rust-bot
Copy link
Copy Markdown
Collaborator

🔊@mxsm 🚀Thanks for your contribution 🎉. CodeRabbit(AI) will review your code first 🔥

@rocketmq-rust-robot rocketmq-rust-robot added the feature🚀 Suggest an idea for this project. label Dec 4, 2024
@rocketmq-rust-bot rocketmq-rust-bot added the waiting-review waiting review this PR label Dec 4, 2024
Copy link
Copy Markdown
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 5

🧹 Outside diff range and nitpick comments (2)
rocketmq-broker/src/processor/send_message_processor.rs (2)

43-43: Review the necessity of the added import for TopicSysFlag

An import statement for TopicSysFlag has been added. Please verify that TopicSysFlag is utilized within the code to justify this import. Unused imports can clutter the code and should be removed if not needed.


49-49: Confirm usage of MessageAccessor after adding its import

The import MessageAccessor::MessageAccessor has been added. Ensure that methods from MessageAccessor are actually used in the code. If not, consider removing the import to keep the code clean.

📜 Review details

Configuration used: .coderabbit.yaml
Review profile: CHILL

📥 Commits

Reviewing files that changed from the base of the PR and between e4eb0de and 1b7194c.

📒 Files selected for processing (2)
  • rocketmq-broker/src/processor/reply_message_processor.rs (2 hunks)
  • rocketmq-broker/src/processor/send_message_processor.rs (11 hunks)
🔇 Additional comments (8)
rocketmq-broker/src/processor/send_message_processor.rs (6)

59-59: Addition of ConsumerSendMsgBackRequestHeader import

The import for ConsumerSendMsgBackRequestHeader has been added, indicating new functionality related to message consumption. Verify that this header is correctly used in handling consumer send-back messages.


71-71: Utilize the imported PutMessageStatus effectively

The PutMessageStatus is imported for handling message storage results. Ensure all possible statuses are handled appropriately in the code to prevent unexpected behavior.


205-205: Add consume_message_hook_vec to the Inner struct

The field consume_message_hook_vec has been added to the Inner struct. Ensure it is correctly initialized and managed, and that all access to it is thread-safe.


214-214: Initialize store_host in Inner struct

The store_host field is now part of Inner. Verify that store_host is initialized correctly and used consistently across the codebase for accurate message metadata.


980-980: Correct initialization of store_host in Inner struct

The store_host field is initialized. Confirm that the socket address is correctly parsed and that error handling is in place if parsing fails.


1004-1009: Verify execution of consume message hooks

In the execute_consume_message_hook_after method, hooks are iterated and executed. Ensure that the hooks modify the context as intended and that any exceptions within hooks are properly handled to prevent unexpected termination.

rocketmq-broker/src/processor/reply_message_processor.rs (2)

85-85: Include consume_message_hook_vec in Inner struct

The consume_message_hook_vec field has been added to the Inner struct. Ensure that any consume message hooks are properly managed and invoked within the ReplyMessageProcessor.


94-94: Initialization of store_host in Inner struct

The store_host is now part of the Inner struct. Verify that this address is correctly assigned and used throughout the message processing, specifically for accurate message metadata.

use crate::client::manager::producer_manager::ProducerManager;
use crate::client::net::broker_to_client::Broker2Client;
use crate::client::rebalance::rebalance_lock_manager::RebalanceLockManager;
use crate::mqtrace::consume_message_context::ConsumeMessageContext;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Implement ConsumeMessageContext and ConsumeMessageHook correctly

New imports for ConsumeMessageContext and ConsumeMessageHook suggest additions to message consumption hooks. Confirm that these hooks are properly integrated and invoked at the correct points in the message processing flow.

Consider adding documentation or comments explaining how these hooks are intended to be used, which can aid future maintenance.

Comment on lines +642 to +693
rocketmq_store::base::message_status_enum::PutMessageStatus::PutOk => {
send_ok = true;
response.set_code_ref(RemotingSysResponseCode::Success);
},
rocketmq_store::base::message_status_enum::PutMessageStatus::FlushDiskTimeout => {
send_ok = true;
response.set_code_ref(ResponseCode::FlushDiskTimeout);
},
rocketmq_store::base::message_status_enum::PutMessageStatus::FlushSlaveTimeout => {
send_ok = true;
response.set_code_ref(ResponseCode::FlushSlaveTimeout);
},
rocketmq_store::base::message_status_enum::PutMessageStatus::SlaveNotAvailable =>{
send_ok = true;
response.set_code_ref(ResponseCode::SlaveNotAvailable);
},
rocketmq_store::base::message_status_enum::PutMessageStatus::ServiceNotAvailable => {
response.set_code_mut(ResponseCode::ServiceNotAvailable).set_remark_mut("service not available now. It may be caused by one of the following reasons: \
the broker's disk is full %s, messages are put to the slave, message store has been shut down, etc.");
},
rocketmq_store::base::message_status_enum::PutMessageStatus::CreateMappedFileFailed => {
response.set_code_mut(RemotingSysResponseCode::SystemError).set_remark_mut("create mapped file failed, remoting_server is busy or broken.");
},
rocketmq_store::base::message_status_enum::PutMessageStatus::MessageIllegal |
rocketmq_store::base::message_status_enum::PutMessageStatus::PropertiesSizeExceeded => {
response.set_code_mut(ResponseCode::MessageIllegal).set_remark_mut("the message is illegal, maybe msg body or properties length not matched. msg body length limit B, msg properties length limit 32KB.");
},
rocketmq_store::base::message_status_enum::PutMessageStatus::OsPageCacheBusy =>{
response.set_code_mut(RemotingSysResponseCode::SystemError).set_remark_mut("[PC_SYNCHRONIZED]broker busy, start flow control for a while");
},
rocketmq_store::base::message_status_enum::PutMessageStatus::UnknownError => {
response.set_code_mut(RemotingSysResponseCode::SystemError).set_remark_mut("UNKNOWN_ERROR");
},
rocketmq_store::base::message_status_enum::PutMessageStatus::InSyncReplicasNotEnough => {
response.set_code_mut(RemotingSysResponseCode::SystemError).set_remark_mut("in-sync replicas not enough");
},
rocketmq_store::base::message_status_enum::PutMessageStatus::LmqConsumeQueueNumExceeded => {
response.set_code_mut(RemotingSysResponseCode::SystemError).set_remark_mut("[LMQ_CONSUME_QUEUE_NUM_EXCEEDED]broker config enableLmq and enableMultiDispatch, lmq consumeQueue num exceed maxLmqConsumeQueueNum config num, default limit 2w.");
},
rocketmq_store::base::message_status_enum::PutMessageStatus::WheelTimerFlowControl => {
response.set_code_mut(RemotingSysResponseCode::SystemError).set_remark_mut("timer message is under flow control, max num limit is %d or the current value is greater than %d and less than %d, trigger random flow control");
},
rocketmq_store::base::message_status_enum::PutMessageStatus::WheelTimerMsgIllegal => {
response.set_code_mut(ResponseCode::MessageIllegal).set_remark_mut("timer message illegal, the delay time should not be bigger than the max delay %dms; or if set del msg, the delay time should be bigger than the current time");
},
rocketmq_store::base::message_status_enum::PutMessageStatus::WheelTimerNotEnable => {
response.set_code_mut(RemotingSysResponseCode::SystemError).set_remark_mut("accurate timer message is not enabled, timerWheelEnable is %s");
},
_ => {
response.set_code_mut(RemotingSysResponseCode::SystemError).set_remark_mut("UNKNOWN_ERROR DEFAULT");
}
}
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Ensure comprehensive handling of all PutMessageStatus variants

An extensive match block handles different PutMessageStatus cases. Double-check that every status is accounted for and that the response codes and remarks accurately reflect the nature of each status.

It's vital to provide clear and user-friendly error messages. Consider reviewing the messages for clarity and consistency.

pub(crate) struct Inner<MS, TS> {
pub(crate) topic_config_manager: TopicConfigManager,
pub(crate) send_message_hook_vec: ArcMut<Vec<Box<dyn SendMessageHook>>>,
pub(crate) consume_message_hook_vec: ArcMut<Vec<Box<dyn ConsumeMessageHook>>>,
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Ensure thread-safe access to consume_message_hook_vec

The consume_message_hook_vec field is accessed within potentially concurrent contexts. Verify that all accesses are properly synchronized to prevent data races or other concurrency issues.

If ArcMut<Vec<Box<dyn ConsumeMessageHook>>> does not ensure thread safety, consider wrapping it in mutexes or using concurrent data structures.

Comment on lines +983 to +997
impl<MS, TS> Inner<MS, TS>
where
MS: MessageStore,
TS: TransactionalMessageService,
{
#[inline]
pub fn has_send_message_hook(&self) -> bool {
self.send_message_hook_vec.is_empty()
}

#[inline]
pub fn has_consume_message_hook(&self) -> bool {
self.consume_message_hook_vec.is_empty()
}

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Fix logic in has_consume_message_hook method

The has_consume_message_hook method currently returns true when consume_message_hook_vec is empty, which is counterintuitive and may lead to incorrect behavior.

Apply this diff to correct the logic:

 #[inline]
 pub fn has_consume_message_hook(&self) -> bool {
-    self.consume_message_hook_vec.is_empty()
+    !self.consume_message_hook_vec.is_empty()
 }

This change ensures that has_consume_message_hook returns true when hooks are present, aligning with expected behavior.

📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
impl<MS, TS> Inner<MS, TS>
where
MS: MessageStore,
TS: TransactionalMessageService,
{
#[inline]
pub fn has_send_message_hook(&self) -> bool {
self.send_message_hook_vec.is_empty()
}
#[inline]
pub fn has_consume_message_hook(&self) -> bool {
self.consume_message_hook_vec.is_empty()
}
impl<MS, TS> Inner<MS, TS>
where
MS: MessageStore,
TS: TransactionalMessageService,
{
#[inline]
pub fn has_send_message_hook(&self) -> bool {
self.send_message_hook_vec.is_empty()
}
#[inline]
pub fn has_consume_message_hook(&self) -> bool {
!self.consume_message_hook_vec.is_empty()
}

@rocketmq-rust-bot
Copy link
Copy Markdown
Collaborator

🔊@mxsm 🚀Thanks for your contribution 🎉. CodeRabbit(AI) will review your code first 🔥

Copy link
Copy Markdown
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

🧹 Outside diff range and nitpick comments (2)
rocketmq-broker/src/processor/send_message_processor.rs (2)

1229-1230: Address TODO comments for schedule topic and DLQ handling

The implementation has unfinished TODO comments for schedule topic and DLQ handling. These need to be implemented for complete functionality.

Would you like me to help implement these missing features or create GitHub issues to track them?

Also applies to: 1233-1234


1218-1245: Enhance error handling in put_message_result match

The error handling could be more informative. Consider:

  1. Adding specific error messages for each PutMessageStatus variant
  2. Including more context in error responses
            _ => (
                RemotingCommand::create_response_command_with_code_remark(
                    ResponseCode::SystemError,
-                   put_message_result.put_message_status().to_string(),
+                   format!("Failed to put message back: {}, topic: {}", 
+                          put_message_result.put_message_status(),
+                          inner_topic),
                ),
                false,
            ),
📜 Review details

Configuration used: .coderabbit.yaml
Review profile: CHILL

📥 Commits

Reviewing files that changed from the base of the PR and between 1b7194c and da8c581.

📒 Files selected for processing (1)
  • rocketmq-broker/src/processor/send_message_processor.rs (11 hunks)
🔇 Additional comments (3)
rocketmq-broker/src/processor/send_message_processor.rs (3)

993-996: LGTM: Correct implementation of has_consume_message_hook

The implementation correctly checks if the hook vector is non-empty.


642-693: LGTM: Comprehensive handling of PutMessageStatus variants

The match statement thoroughly handles all possible message status variants with appropriate response codes and remarks.


970-970: Ensure thread-safe access to consume_message_hook_vec

The hook vector is wrapped in ArcMut which provides interior mutability, but consider documenting the thread-safety guarantees and any synchronization requirements.

✅ Verification successful

Let me analyze the access patterns shown in the results. I can see that:

  1. consume_message_hook_vec is initialized with ArcMut::new(Vec::new())
  2. It's only accessed in two ways:
    • is_empty() check in has_consume_message_hook
    • iter() in execute_consume_message_hook_after

Let me verify if there are any mutation points by checking for any add/remove operations on the vector.


Let me try a different approach to find any mutations using ripgrep:


Based on the evidence gathered, I can now generate the final response:

Thread safety is properly handled for consume_message_hook_vec

The consume_message_hook_vec implementation is thread-safe as:

  • It's wrapped in ArcMut for safe shared ownership
  • Only read-only operations are performed (is_empty() and iter())
  • No mutations are observed after initialization
  • Follows the same pattern as other hook implementations in the codebase
🏁 Scripts executed

The following scripts were executed for the analysis:

Script:

#!/bin/bash
# Check for any concurrent access patterns to consume_message_hook_vec
rg -A 5 "consume_message_hook_vec" 

Length of output: 3091


Script:

#!/bin/bash
# Check for any mutations to consume_message_hook_vec
ast-grep --pattern 'consume_message_hook_vec.$_'

Length of output: 50


Script:

#!/bin/bash
# Search for any methods that might modify the hook vector
rg -B 3 -A 3 "fn.*consume.*hook" 

Length of output: 3175

Comment on lines +1032 to +1283
pub(crate) async fn consumer_send_msg_back(
&mut self,
_channel: &Channel,
_ctx: &ConnectionHandlerContext,
_request: &RemotingCommand,
request: &RemotingCommand,
) -> crate::Result<Option<RemotingCommand>> {
todo!()
let request_header = request
.decode_command_custom_header::<ConsumerSendMsgBackRequestHeader>()
.map_err(|e| RemotingCommandError(e.to_string()))?;
if self.broker_config.broker_identity.broker_id != mix_all::MASTER_ID {
return Ok(Some(
RemotingCommand::create_response_command_with_code_remark(
ResponseCode::SystemError,
format!(
"no master available along with {}",
self.broker_config.broker_ip1
),
),
));
}
let subscription_group_config = self
.subscription_group_manager
.find_subscription_group_config(&request_header.group);
if subscription_group_config.is_none() {
return Ok(Some(
RemotingCommand::create_response_command_with_code_remark(
ResponseCode::SubscriptionNotExist,
format!(
"subscription group not exist, {} {}",
request_header.group,
FAQUrl::suggest_todo(FAQUrl::SUBSCRIPTION_GROUP_NOT_EXIST)
),
),
));
}

if !PermName::is_writeable(self.broker_config.broker_permission) {
return Ok(Some(
RemotingCommand::create_response_command_with_code_remark(
ResponseCode::NoPermission,
format!(
"the broker[{}-{}] sending message is forbidden",
self.broker_config.broker_identity.broker_name,
self.broker_config.broker_ip1
),
),
));
}

let subscription_group_config = subscription_group_config.unwrap();
if subscription_group_config.retry_queue_nums() <= 0 {
return Ok(Some(RemotingCommand::create_remoting_command(
ResponseCode::Success,
)));
}
let mut new_topic =
CheetahString::from_string(mix_all::get_retry_topic(request_header.group.as_str()));
let mut queue_id_int =
rand::thread_rng().gen_range(0..subscription_group_config.retry_queue_nums());
let topic_sys_flag = if request_header.unit_mode {
TopicSysFlag::build_sys_flag(false, true)
} else {
0
};
let topic_config = self
.topic_config_manager
.create_topic_in_send_message_back_method(
&new_topic,
subscription_group_config.retry_queue_nums(),
PermName::PERM_WRITE | PermName::PERM_READ,
false,
topic_sys_flag,
);
if topic_config.is_none() {
return Ok(Some(
RemotingCommand::create_response_command_with_code_remark(
ResponseCode::SystemError,
format!("topic {} not exist", new_topic),
),
));
}
let topic_config = topic_config.unwrap();
if !PermName::is_writeable(topic_config.perm) {
return Ok(Some(
RemotingCommand::create_response_command_with_code_remark(
ResponseCode::NoPermission,
format!("the topic[{}] sending message is forbidden", new_topic),
),
));
}
let msg_ext = self
.message_store
.look_message_by_offset(request_header.offset);
if msg_ext.is_none() {
return Ok(Some(
RemotingCommand::create_response_command_with_code_remark(
ResponseCode::SystemError,
format!(
"look message by offset failed, the offset is {}",
request_header.offset
),
),
));
}

let mut msg_ext = msg_ext.unwrap();
let retry_topic = msg_ext.get_property(&CheetahString::from_static_str(
MessageConst::PROPERTY_RETRY_TOPIC,
));
if retry_topic.is_none() {
let topic = msg_ext.get_topic().clone();
MessageAccessor::put_property(
&mut msg_ext,
CheetahString::from_static_str(MessageConst::PROPERTY_RETRY_TOPIC),
topic,
);
}
msg_ext.set_wait_store_msg_ok(false);
let mut delay_level = request_header.delay_level;
let mut max_reconsume_times = subscription_group_config.retry_max_times();
if request.version() >= RocketMqVersion::V349 as i32 {
if let Some(num) = request_header.max_reconsume_times {
max_reconsume_times = num;
}
}

let is_dlq = if msg_ext.reconsume_times >= max_reconsume_times || delay_level < 0 {
new_topic = CheetahString::from_string(mix_all::get_dlq_topic(&request_header.group));
queue_id_int = 0;
let topic_config_inner = self
.topic_config_manager
.create_topic_in_send_message_back_method(
&new_topic,
DLQ_NUMS_PER_GROUP as i32,
PermName::PERM_WRITE | PermName::PERM_READ,
false,
0,
);
if topic_config_inner.is_none() {
return Ok(Some(
RemotingCommand::create_response_command_with_code_remark(
ResponseCode::SystemError,
format!("topic {} not exist", new_topic),
),
));
}
msg_ext.set_delay_time_level(0);
true
} else {
if 0 == delay_level {
delay_level = 3 + msg_ext.reconsume_times();
}
msg_ext.set_delay_time_level(delay_level);
false
};
let mut msg_inner = MessageExtBrokerInner::default();
msg_inner.set_topic(new_topic);
if let Some(body) = msg_ext.get_body() {
msg_inner.set_body(body.clone());
}
msg_inner.set_flag(msg_ext.get_flag());
MessageAccessor::set_properties(&mut msg_inner, msg_ext.get_properties().clone());
msg_inner.properties_string = message_properties_to_string(msg_ext.get_properties());
msg_inner.tags_code = MessageExtBrokerInner::tags_string_to_tags_code(
msg_ext.get_tags().unwrap_or_default().as_str(),
);
msg_inner.message_ext_inner.queue_id = queue_id_int;
msg_inner.message_ext_inner.sys_flag = msg_ext.sys_flag;
msg_inner.message_ext_inner.born_timestamp = msg_ext.born_timestamp;
msg_inner.message_ext_inner.born_host = msg_ext.born_host;
msg_inner.message_ext_inner.store_host = self.store_host;
msg_inner.message_ext_inner.reconsume_times = msg_ext.reconsume_times + 1;

let origin_msg_id = if let Some(id) = MessageAccessor::get_origin_message_id(&msg_ext) {
id
} else {
msg_ext.msg_id.clone()
};
MessageAccessor::set_origin_message_id(&mut msg_inner, origin_msg_id);
msg_inner.properties_string = message_properties_to_string(msg_ext.get_properties());

let inner_topic = msg_inner.get_topic().clone();
let put_message_result = self.message_store.put_message(msg_inner).await;
let commercial_owner = request
.get_ext_fields()
.and_then(|value| value.get(BrokerStatsManager::COMMERCIAL_OWNER).cloned());
let (response, succeeded) = match put_message_result.put_message_status() {
PutMessageStatus::PutOk => {
let mut _back_topic = msg_ext.get_topic().clone();
let correct_topic = msg_ext.get_property(&CheetahString::from_static_str(
MessageConst::PROPERTY_RETRY_TOPIC,
));
if let Some(topic) = correct_topic {
_back_topic = topic;
}

if TopicValidator::RMQ_SYS_SCHEDULE_TOPIC == inner_topic {
//TODO: implement this
}

if is_dlq {
// TODO: implement this
}
(RemotingCommand::create_response_command(), true)
}

_ => (
RemotingCommand::create_response_command_with_code_remark(
ResponseCode::SystemError,
put_message_result.put_message_status().to_string(),
),
false,
),
};
if self.has_consume_message_hook()
&& request_header
.origin_msg_id
.is_some_and(|ref id| !id.is_empty())
{
let mut context = ConsumeMessageContext::default();
let namespace =
NamespaceUtil::get_namespace_from_resource(request_header.group.as_str());
context.namespace = CheetahString::from_string(namespace);
if let Some(ref topic) = request_header.origin_topic {
context.topic = topic.clone();
}
context.consumer_group = request_header.group.clone();
context.commercial_rcv_stats = StatsType::SendBack;
context.commercial_rcv_times = 1;
context.commercial_owner = commercial_owner;
context.account_auth_type = request
.get_ext_fields()
.and_then(|value| value.get(BrokerStatsManager::ACCOUNT_AUTH_TYPE).cloned());
context.account_owner_parent = request
.get_ext_fields()
.and_then(|value| value.get(BrokerStatsManager::ACCOUNT_OWNER_PARENT).cloned());
context.account_owner_self = request
.get_ext_fields()
.and_then(|value| value.get(BrokerStatsManager::ACCOUNT_OWNER_SELF).cloned());
context.rcv_stat = if is_dlq {
StatsType::SendBackToDlq
} else {
StatsType::SendBack
};
context.success = succeeded;
context.rcv_msg_num = 1;
context.rcv_msg_size = 0;
context.commercial_rcv_msg_num = if succeeded { 1 } else { 0 };
self.execute_consume_message_hook_after(&mut context);
}

Ok(Some(response))
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Refactor consumer_send_msg_back for better maintainability

The method is lengthy and handles multiple concerns. Consider breaking it down into smaller, focused methods:

  • Message validation logic (lines 1038-1079)
  • DLQ handling logic (lines 1158-1186)
  • Message preparation (lines 1187-1212)
  • Hook execution (lines 1246-1281)

@rocketmq-rust-bot rocketmq-rust-bot merged commit c4c3739 into main Dec 4, 2024
@rocketmq-rust-bot rocketmq-rust-bot added approved PR has approved and removed ready to review waiting-review waiting review this PR labels Dec 4, 2024
@mxsm mxsm deleted the feature-1546 branch December 4, 2024 15:51
@codecov
Copy link
Copy Markdown

codecov bot commented Dec 4, 2024

Codecov Report

Attention: Patch coverage is 0% with 296 lines in your changes missing coverage. Please review.

Project coverage is 24.95%. Comparing base (e4eb0de) to head (da8c581).
Report is 2 commits behind head on main.

Files with missing lines Patch % Lines
...tmq-broker/src/processor/send_message_processor.rs 0.00% 294 Missing ⚠️
...mq-broker/src/processor/reply_message_processor.rs 0.00% 2 Missing ⚠️
Additional details and impacted files
@@            Coverage Diff             @@
##             main    #1547      +/-   ##
==========================================
- Coverage   25.05%   24.95%   -0.11%     
==========================================
  Files         450      450              
  Lines       59609    59851     +242     
==========================================
  Hits        14933    14933              
- Misses      44676    44918     +242     

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

AI review first Ai review pr first approved PR has approved auto merge feature🚀 Suggest an idea for this project.

Projects

None yet

Development

Successfully merging this pull request may close these issues.

[Feature🚀] Rocketmq-rust broker supports RequestCode ConsumerSendMsgBack(36)

4 participants