[ISSUE #1691]🍻Implement ChangeInvisibleTimeProcessor#ack_origin method🚀#1732
[ISSUE #1691]🍻Implement ChangeInvisibleTimeProcessor#ack_origin method🚀#1732rocketmq-rust-bot merged 1 commit intomainfrom
Conversation
WalkthroughThe pull request introduces several modifications primarily centered around the Changes
Assessment against linked issues
Possibly related PRs
Suggested labels
Suggested reviewers
Thank you for using CodeRabbit. We offer it for free to the OSS community and would appreciate your support in helping us grow. If you find it useful, would you consider giving us a shout-out on your favorite social media? 🪧 TipsChatThere are 3 ways to chat with CodeRabbit:
Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments. CodeRabbit Commands (Invoked using PR comments)
Other keywords and placeholders
Documentation and Community
|
|
🔊@mxsm 🚀Thanks for your contribution 🎉. CodeRabbit(AI) will review your code first 🔥 |
Codecov ReportAttention: Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## main #1732 +/- ##
==========================================
- Coverage 28.10% 28.07% -0.03%
==========================================
Files 466 466
Lines 64103 64163 +60
==========================================
Hits 18016 18016
- Misses 46087 46147 +60 ☔ View full report in Codecov by Sentry. |
There was a problem hiding this comment.
Actionable comments posted: 5
📜 Review details
Configuration used: .coderabbit.yaml
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (6)
rocketmq-broker/src/broker_runtime.rs(2 hunks)rocketmq-broker/src/processor/change_invisible_time_processor.rs(5 hunks)rocketmq-broker/src/processor/processor_service/pop_buffer_merge_service.rs(1 hunks)rocketmq-store/src/pop/ack_msg.rs(1 hunks)rocketmq-store/src/pop/batch_ack_msg.rs(1 hunks)rocketmq-store/src/stats/broker_stats_manager.rs(1 hunks)
🔇 Additional comments (3)
rocketmq-store/src/pop/batch_ack_msg.rs (1)
24-24: Adding Default trait derivation enhances usability
Deriving the Default trait for BatchAckMsg improves its usability by allowing default instantiation, which aligns with Rust best practices.
rocketmq-store/src/pop/ack_msg.rs (1)
23-23: Deriving Default for AckMsg facilitates default initialization
Including the Default trait in AckMsg enhances its flexibility and allows for default instantiation, which can be beneficial in various contexts.
rocketmq-broker/src/broker_runtime.rs (1)
555-556: LGTM: ChangeInvisibleTimeProcessor initialization is properly structured.
The initialization includes all necessary dependencies including the new PopBufferMergeService and escape_bridge components.
| request_header: &ChangeInvisibleTimeRequestHeader, | ||
| extra_info: &[String], | ||
| ) -> crate::Result<()> { | ||
| unimplemented!("ChangeInvisibleTimeProcessor ack_origin") | ||
| let ack_msg = AckMsg { | ||
| ack_offset: request_header.offset, | ||
| start_offset: ExtraInfoUtil::get_ck_queue_offset(extra_info)?, | ||
| consumer_group: request_header.consumer_group.clone(), | ||
| topic: request_header.topic.clone(), | ||
| queue_id: request_header.queue_id, | ||
| pop_time: ExtraInfoUtil::get_pop_time(extra_info)?, | ||
| broker_name: CheetahString::from_string(ExtraInfoUtil::get_broker_name(extra_info)?), | ||
| }; | ||
|
|
||
| let rq_id = ExtraInfoUtil::get_revive_qid(extra_info)?; | ||
| self.broker_stats_manager.inc_broker_ack_nums(1); | ||
| self.broker_stats_manager.inc_group_ack_nums( | ||
| request_header.consumer_group.as_str(), | ||
| request_header.topic.as_str(), | ||
| 1, | ||
| ); | ||
| if self.pop_buffer_merge_service.add_ack(rq_id, &ack_msg) { | ||
| return Ok(()); | ||
| } | ||
| let mut inner = MessageExtBrokerInner::default(); | ||
| inner.set_topic(self.revive_topic.clone()); | ||
| inner.set_body(Bytes::from(ack_msg.encode()?)); | ||
| inner.message_ext_inner.queue_id = rq_id; | ||
| inner.set_tags(CheetahString::from_static_str(PopAckConstants::ACK_TAG)); | ||
| inner.message_ext_inner.born_timestamp = get_current_millis() as i64; | ||
| inner.message_ext_inner.born_host = self.store_host; | ||
| inner.message_ext_inner.store_host = self.store_host; | ||
| let deliver_time_ms = ExtraInfoUtil::get_pop_time(extra_info)? | ||
| + ExtraInfoUtil::get_invisible_time(extra_info)?; | ||
| inner.set_delay_time_ms(deliver_time_ms as u64); | ||
| inner.message_ext_inner.put_property( | ||
| CheetahString::from_static_str(MessageConst::PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX), | ||
| CheetahString::from(PopMessageProcessor::gen_ack_unique_id(&ack_msg)), | ||
| ); | ||
| inner.properties_string = | ||
| message_decoder::message_properties_to_string(inner.get_properties()); | ||
| let result = self | ||
| .escape_bridge | ||
| .put_message_to_specific_queue(inner) | ||
| .await; | ||
| match result.put_message_status() { | ||
| PutMessageStatus::PutOk | ||
| | PutMessageStatus::FlushDiskTimeout | ||
| | PutMessageStatus::FlushSlaveTimeout | ||
| | PutMessageStatus::SlaveNotAvailable | ||
| | PutMessageStatus::ServiceNotAvailable => {} | ||
| _ => { | ||
| error!( | ||
| "change Invisible, put ack msg error: {}", | ||
| result.put_message_status() | ||
| ); | ||
| } | ||
| } |
There was a problem hiding this comment.
🛠️ Refactor suggestion
Improve error handling in the ack_origin method
In the ack_origin method, if add_ack returns false, the code proceeds to send the message via escape_bridge. It's unclear whether this is the intended behavior. Please verify that the logic correctly handles the return value of add_ack.
Additionally, when put_message_to_specific_queue fails (other than statuses like PutOk, FlushDiskTimeout, etc.), the error is only logged, and no action is taken. This could lead to silent failures in message acknowledgment.
Consider the following improvements:
-
Clarify the logic based on
add_ackreturn value:Ensure that the behavior when
add_ackreturnstrueorfalsealigns with the intended message acknowledgment flow. Iffalseindicates a failure to add the acknowledgment, additional error handling or retry logic may be necessary. -
Handle failures from
put_message_to_specific_queue:Implement error propagation or a retry mechanism when
put_message_to_specific_queuefails. For example, you might return an error fromack_originor increment a failure metric to monitor such occurrences.
| let store_host = format!("{}:{}", broker_config.broker_ip1, broker_config.listen_port) | ||
| .parse::<SocketAddr>() | ||
| .unwrap(); |
There was a problem hiding this comment.
Avoid using .unwrap() to prevent potential panics
Using .unwrap() on parse::<SocketAddr>() can cause the application to panic if the parsing fails due to an invalid IP address or port. It's safer to handle the potential error to improve the robustness of the application.
Apply this diff to handle the parsing error:
-let store_host = format!("{}:{}", broker_config.broker_ip1, broker_config.listen_port)
- .parse::<SocketAddr>()
- .unwrap();
+let store_host = format!("{}:{}", broker_config.broker_ip1, broker_config.listen_port)
+ .parse::<SocketAddr>()
+ .map_err(|e| {
+ error!("Failed to parse store_host: {}", e);
+ e
+ })?This change requires updating the return type of the new function to Result<Self, ErrorType> and handling the error appropriately where the constructor is called.
Committable suggestion skipped: line range outside the PR's diff.
| pub fn add_ack(&mut self, _revive_qid: i32, _ack_msg: &AckMsg) -> bool { | ||
| unimplemented!("Not implemented yet"); | ||
| } |
There was a problem hiding this comment.
Implement the add_ack method in PopBufferMergeService
The add_ack method is currently unimplemented, which will cause a panic if called. This method is crucial for acknowledging messages within the pop buffer merge service.
Would you like assistance in implementing this method or creating a GitHub issue to track this task?
|
|
||
| pub fn inc_broker_ack_nums(&self, inc_value: i32) {} |
There was a problem hiding this comment.
Implementation missing for broker acknowledgment tracking.
The inc_broker_ack_nums method is empty but should increment the broker acknowledgment statistics. This is inconsistent with other similar methods in the class.
Consider implementing similar to other increment methods:
-pub fn inc_broker_ack_nums(&self, inc_value: i32) {}
+pub fn inc_broker_ack_nums(&self, inc_value: i32) {
+ if let Some(stats_item_set) = self.stats_table.read().get(Self::BROKER_ACK_NUMS) {
+ stats_item_set.add_value("", inc_value);
+ }
+}📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| pub fn inc_broker_ack_nums(&self, inc_value: i32) {} | |
| pub fn inc_broker_ack_nums(&self, inc_value: i32) { | |
| if let Some(stats_item_set) = self.stats_table.read().get(Self::BROKER_ACK_NUMS) { | |
| stats_item_set.add_value("", inc_value); | |
| } | |
| } |
| ArcMut::new(PopBufferMergeService), | ||
| self.escape_bridge.clone(), |
There was a problem hiding this comment.
💡 Codebase verification
PopBufferMergeService is not implemented and will cause runtime failures
The service has an unimplemented add_ack method that will panic at runtime with "Not implemented yet". This is a critical issue since the service is being used in the broker's initialization path.
rocketmq-broker/src/processor/processor_service/pop_buffer_merge_service.rs:24: Methodadd_ackthrows unimplemented errorrocketmq-broker/src/broker_runtime.rs:555: Service is being initialized and used despite being incomplete
🔗 Analysis chain
Verify PopBufferMergeService initialization.
The PopBufferMergeService is instantiated without any configuration or parameters. This bare initialization may not provide the required functionality for message acknowledgment handling.
Run this script to check the service implementation:
🏁 Scripts executed
The following scripts were executed for the analysis:
Script:
#!/bin/bash
# Description: Check PopBufferMergeService implementation
# Expect: Service should have methods for handling message acknowledgments
ast-grep --pattern 'struct PopBufferMergeService {
$$$
}'
ast-grep --pattern 'impl PopBufferMergeService {
$$$
}'
Length of output: 677
Which Issue(s) This PR Fixes(Closes)
Fixes #1691
Brief Description
How Did You Test This Change?
Summary by CodeRabbit
New Features
PopBufferMergeServiceto enhance processing capabilities.BrokerStatsManagerfor incrementing acknowledgment counts.Improvements
ChangeInvisibleTimeProcessorwith additional fields and error handling for message acknowledgment.AckMsgandBatchAckMsgstructs to support default initialization.Bug Fixes
process_request_innermethod for topic configuration and queue ID validation.