[ISSUE #1648]🚀Add ConsumeRequest for ConsumeMessagePopConcurrentlyService🔥#1654
[ISSUE #1648]🚀Add ConsumeRequest for ConsumeMessagePopConcurrentlyService🔥#1654rocketmq-rust-bot merged 1 commit intomainfrom
Conversation
WalkthroughThe changes in this pull request primarily focus on enhancing the Changes
Assessment against linked issues
Possibly related issues
Possibly related PRs
Suggested labels
Suggested reviewers
Thank you for using CodeRabbit. We offer it for free to the OSS community and would appreciate your support in helping us grow. If you find it useful, would you consider giving us a shout-out on your favorite social media? 🪧 TipsChatThere are 3 ways to chat with CodeRabbit:
Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments. CodeRabbit Commands (Invoked using PR comments)
Other keywords and placeholders
Documentation and Community
|
|
🔊@mxsm 🚀Thanks for your contribution 🎉. CodeRabbit(AI) will review your code first 🔥 |
Codecov ReportAttention: Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## main #1654 +/- ##
==========================================
- Coverage 27.42% 27.36% -0.06%
==========================================
Files 466 466
Lines 62306 62442 +136
==========================================
Hits 17089 17089
- Misses 45217 45353 +136 ☔ View full report in Codecov by Sentry. |
There was a problem hiding this comment.
Actionable comments posted: 4
🧹 Outside diff range and nitpick comments (2)
rocketmq-client/src/consumer/consumer_impl/consume_message_pop_concurrently_service.rs (2)
223-231: Adjust indentation for better readabilityThere is an indentation issue in the logging statement that spans multiple lines. Correct the indentation to enhance code readability.
Apply this diff to fix the indentation:
if self.is_pop_timeout() { info!( - "the pop message time out so abort consume. popTime={} invisibleTime={}, group={} \ - {}", + "the pop message time out so abort consume. popTime={} invisibleTime={}, group={} {}", self.pop_time, self.invisible_time, self.consumer_group, self.message_queue ); self.process_queue.dec_found_msg(self.msgs.len()); return; }
290-305: Simplify the logic in determiningreturn_typeThe logic for setting
return_typecan be simplified for better readability. Consider restructuring the conditions to make the code clearer.Apply this diff to simplify the logic:
if status.is_none() { if has_exception { return_type = ConsumeReturnType::Exception; } else { return_type = ConsumeReturnType::ReturnNull; } -} else if consume_rt - > default_mqpush_consumer_impl.consumer_config.consume_timeout * 60 * 1000 -{ - return_type = ConsumeReturnType::TimeOut; -} else if status.unwrap() == ConsumeConcurrentlyStatus::ReconsumeLater { - return_type = ConsumeReturnType::Failed; -} else if status.unwrap() == ConsumeConcurrentlyStatus::ConsumeSuccess { - return_type = ConsumeReturnType::Success; -} +} else { + if consume_rt > default_mqpush_consumer_impl.consumer_config.consume_timeout * 60 * 1000 { + return_type = ConsumeReturnType::TimeOut; + } else { + match status.unwrap() { + ConsumeConcurrentlyStatus::ReconsumeLater => { + return_type = ConsumeReturnType::Failed; + } + ConsumeConcurrentlyStatus::ConsumeSuccess => { + return_type = ConsumeReturnType::Success; + } + } + } +}
📜 Review details
Configuration used: .coderabbit.yaml
Review profile: CHILL
📒 Files selected for processing (1)
rocketmq-client/src/consumer/consumer_impl/consume_message_pop_concurrently_service.rs(4 hunks)
🔇 Additional comments (4)
rocketmq-client/src/consumer/consumer_impl/consume_message_pop_concurrently_service.rs (4)
313-315: Ensure status is always set before use
Setting status to ConsumeConcurrentlyStatus::ReconsumeLater when it is None ensures that subsequent calls to unwrap() on status are safe. Good job handling this potential issue.
317-323: Safe usage of consume_message_context with proper checks
Calling unwrap() on consume_message_context after checking default_mqpush_consumer_impl.has_hook() ensures that it is safe to use. This is a correct usage pattern.
42-43: Remove unused import if not necessary
The import of consume_return_type::ConsumeReturnType is used in the code, which is good. However, ensure that all imported modules are necessary. If any imports become unused due to code changes, consider removing them to keep the code clean.
224-231: Clarify timeout logic in is_pop_timeout method
The timeout check in is_pop_timeout relies on pop_time and invisible_time. Ensure that the logic correctly accounts for potential edge cases, such as system clock changes or overflow.
Run the following script to verify the correctness of the timeout calculation:
✅ Verification successful
Let me gather more information about the timeout logic implementation.
Let me check the implementation of is_pop_timeout and related time utilities.
Based on the analysis of the code, I can now generate the final response:
Timeout logic in is_pop_timeout is correctly implemented
The timeout calculation is safely implemented using saturating_sub to prevent integer overflow, and get_current_millis() uses the system's monotonic clock through SystemTime::now(). The comparison with invisible_time is also properly handled with appropriate null checks for empty messages and zero values.
🏁 Scripts executed
The following scripts were executed for the analysis:
Script:
#!/bin/bash
# Description: Check for any integer overflow or potential issues in timeout logic.
# Search for usage of `saturating_sub` to identify potential overflows
rg --fixed-strings 'saturating_sub' --context 5
Length of output: 4126
Script:
#!/bin/bash
# Check the implementation of get_current_millis and related time utilities
rg "get_current_millis" -A 5 -B 5
# Also check for any other time-related comparisons in the same file
ast-grep --pattern 'if $_ >= $_' rocketmq-client/src/consumer/consumer_impl/consume_message_pop_concurrently_service.rs
Length of output: 77308
Script:
# Check the implementation of is_pop_timeout and time-related utilities
ast-grep --pattern 'fn is_pop_timeout($$$)' rocketmq-client/src/consumer/consumer_impl/consume_message_pop_concurrently_service.rs
# Also check the implementation of get_current_millis
rg "pub fn get_current_millis" -A 5 -B 5
Length of output: 900
| unimplemented!( | ||
| "ConsumeMessagePopConcurrentlyService.submit_consume_request is not supported" | ||
| ) |
There was a problem hiding this comment.
Avoid using unimplemented!() for unsupported methods
Using unimplemented!() will cause a runtime panic if the submit_consume_request method is called. If this method is not intended to be supported, consider removing it from the trait implementation or provide a proper implementation that handles the unsupported operation gracefully.
Apply this diff to handle the unsupported method without causing a panic:
async fn submit_consume_request(
&self,
this: ArcMut<Self>,
msgs: Vec<ArcMut<MessageClientExt>>,
process_queue: Arc<ProcessQueue>,
message_queue: MessageQueue,
dispatch_to_consume: bool,
) {
- unimplemented!(
- "ConsumeMessagePopConcurrentlyService.submit_consume_request is not supported"
- )
+ // This service does not support `submit_consume_request`.
+ tracing::warn!(
+ "submit_consume_request is not supported for ConsumeMessagePopConcurrentlyService"
+ );
+ // Optionally, handle the unsupported operation appropriately here.
}Committable suggestion skipped: line range outside the PR's diff.
| impl ConsumeMessagePopConcurrentlyService { | ||
| async fn process_consume_result( | ||
| &mut self, | ||
| this: ArcMut<Self>, | ||
| status: ConsumeConcurrentlyStatus, | ||
| context: &ConsumeConcurrentlyContext, | ||
| consume_request: &mut ConsumeRequest, | ||
| ) { | ||
| unimplemented!("ConsumeMessagePopConcurrentlyService.process_consume_result") | ||
| } | ||
| } |
There was a problem hiding this comment.
Implement or properly handle process_consume_result method
The process_consume_result method is currently unimplemented, which will cause a panic if invoked. This method is crucial for processing the results of message consumption. Please provide an implementation or handle the method appropriately to avoid runtime panics.
Apply this diff to at least log a warning instead of panicking:
impl ConsumeMessagePopConcurrentlyService {
async fn process_consume_result(
&mut self,
this: ArcMut<Self>,
status: ConsumeConcurrentlyStatus,
context: &ConsumeConcurrentlyContext,
consume_request: &mut ConsumeRequest,
) {
- unimplemented!("ConsumeMessagePopConcurrentlyService.process_consume_result")
+ // TODO: Implement the logic for processing consume results.
+ tracing::warn!("process_consume_result is not yet implemented");
}
}📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| impl ConsumeMessagePopConcurrentlyService { | |
| async fn process_consume_result( | |
| &mut self, | |
| this: ArcMut<Self>, | |
| status: ConsumeConcurrentlyStatus, | |
| context: &ConsumeConcurrentlyContext, | |
| consume_request: &mut ConsumeRequest, | |
| ) { | |
| unimplemented!("ConsumeMessagePopConcurrentlyService.process_consume_result") | |
| } | |
| } | |
| impl ConsumeMessagePopConcurrentlyService { | |
| async fn process_consume_result( | |
| &mut self, | |
| this: ArcMut<Self>, | |
| status: ConsumeConcurrentlyStatus, | |
| context: &ConsumeConcurrentlyContext, | |
| consume_request: &mut ConsumeRequest, | |
| ) { | |
| // TODO: Implement the logic for processing consume results. | |
| tracing::warn!("process_consume_result is not yet implemented"); | |
| } | |
| } |
| unimplemented!() | ||
| } |
There was a problem hiding this comment.
Provide an implementation for ConsumeRequest::new
The constructor ConsumeRequest::new is currently unimplemented. This method is essential for creating instances of ConsumeRequest. Ensure all required fields are initialized properly.
Apply this diff to implement the constructor:
pub fn new(
msgs: Vec<ArcMut<MessageClientExt>>,
process_queue: Arc<PopProcessQueue>,
message_queue: MessageQueue,
pop_time: u64,
invisible_time: u64,
+ consumer_group: CheetahString,
+ message_listener: ArcBoxMessageListenerConcurrently,
+ default_mqpush_consumer_impl: Option<ArcMut<DefaultMQPushConsumerImpl>>,
) -> Self {
- unimplemented!()
+ Self {
+ msgs,
+ process_queue,
+ message_queue,
+ pop_time,
+ invisible_time,
+ consumer_group,
+ message_listener,
+ default_mqpush_consumer_impl,
+ }
}Also, update the method signature to include the missing parameters consumer_group, message_listener, and default_mqpush_consumer_impl.
Committable suggestion skipped: line range outside the PR's diff.
| self.default_mqpush_consumer_impl.as_ref().unwrap().clone(); | ||
| default_mqpush_consumer_impl | ||
| .reset_retry_and_namespace(&mut self.msgs, self.consumer_group.as_str()); |
There was a problem hiding this comment.
Handle potential None value for default_mqpush_consumer_impl
Using unwrap() on self.default_mqpush_consumer_impl can cause a panic if it is None. Consider handling the None case to prevent potential runtime errors.
Apply this diff to handle the None case:
let mut default_mqpush_consumer_impl = self.default_mqpush_consumer_impl.as_ref();
-match default_mqpush_consumer_impl {
- Some(impl_ref) => {
- let mut impl_clone = impl_ref.clone();
- impl_clone.reset_retry_and_namespace(&mut self.msgs, self.consumer_group.as_str());
- }
- None => {
- tracing::error!("default_mqpush_consumer_impl is None");
- return;
- }
-}
+if let Some(impl_ref) = default_mqpush_consumer_impl {
+ let mut impl_clone = impl_ref.clone();
+ impl_clone.reset_retry_and_namespace(&mut self.msgs, self.consumer_group.as_str());
+} else {
+ tracing::error!("default_mqpush_consumer_impl is None");
+ return;
+}Committable suggestion skipped: line range outside the PR's diff.
Which Issue(s) This PR Fixes(Closes)
Fixes #1653
Brief Description
How Did You Test This Change?
Summary by CodeRabbit
ConsumeRequestfor improved message consumption handling.