[ISSUE #1598]🚀Rocketmq-client support Pop consumer-1#1600
[ISSUE #1598]🚀Rocketmq-client support Pop consumer-1#1600rocketmq-rust-bot merged 1 commit intomainfrom
Conversation
WalkthroughThe pull request introduces several changes across multiple files in the RocketMQ client implementation. Key modifications include altering method signatures to use immutable references, changing data types from Changes
Assessment against linked issues
Possibly related issues
Suggested labels
Suggested reviewers
Poem
Thank you for using CodeRabbit. We offer it for free to the OSS community and would appreciate your support in helping us grow. If you find it useful, would you consider giving us a shout-out on your favorite social media? 🪧 TipsChatThere are 3 ways to chat with CodeRabbit:
Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments. CodeRabbit Commands (Invoked using PR comments)
Other keywords and placeholders
Documentation and Community
|
|
🔊@mxsm 🚀Thanks for your contribution 🎉. CodeRabbit(AI) will review your code first 🔥 |
There was a problem hiding this comment.
Actionable comments posted: 5
🧹 Outside diff range and nitpick comments (3)
rocketmq-client/src/implementation/client_remoting_processor.rs (3)
Line range hint
107-134: Refactor duplicated error handling logicThe error handling code for parsing socket addresses is duplicated for both
born_hostandstore_host. Consider extracting this into a helper function.+ fn parse_socket_addr(addr: &str, field_name: &str) -> Result<SocketAddr, RemotingCommand> { + match addr.parse::<SocketAddr>() { + Ok(value) => Ok(value), + Err(_) => { + warn!("parse {} failed: {}", field_name, addr); + Err(RemotingCommand::create_response_command() + .set_code(ResponseCode::SystemError) + .set_remark(&format!("parse {} failed", field_name))) + } + } + }Then use it like:
msg.born_host = parse_socket_addr(&request_header.born_host, "born_host")?; msg.store_host = parse_socket_addr(&request_header.store_host, "store_host")?;
Line range hint
135-147: Improve error handling for message decompressionThe current implementation silently falls back to using the compressed body when decompression fails. This could lead to issues downstream. Consider either:
- Propagating the decompression error
- Logging more details about the failure
- Adding metrics for failed decompressions
- if let Ok(decompressed) = de_result { - msg.message.body = Some(decompressed); - } else { - warn!("err when uncompress constant"); - msg.message.body = body.cloned(); - } + match de_result { + Ok(decompressed) => { + msg.message.body = Some(decompressed); + } + Err(e) => { + warn!("Failed to decompress message: {}", e); + return Ok(Some( + response + .set_code(ResponseCode::SystemError) + .set_remark(&format!("Failed to decompress message: {}", e)), + )); + } + }
Line range hint
171-190: Enhance observability for reply message processingConsider adding more detailed logging and metrics to track:
- Success/failure rates of reply message processing
- Latency between request and reply
- Number of unmatched replies
async fn process_reply_message(reply_msg: MessageExt) { + let start = std::time::Instant::now(); let correlation_id = reply_msg .message .get_property(&CheetahString::from_static_str( MessageConst::PROPERTY_CORRELATION_ID, )) .unwrap_or_default(); if let Some(request_response_future) = REQUEST_FUTURE_HOLDER .get_request(correlation_id.as_str()) .await { request_response_future.put_response_message(Some(Box::new(reply_msg))); if request_response_future.get_request_callback().is_some() { request_response_future.on_success(); } + debug!( + "Successfully processed reply message for correlation_id={} in {:?}", + correlation_id, + start.elapsed() + ); } else { - warn!( + error!( "receive reply message, but not matched any request, CorrelationId: {} , reply \ from host: {}", correlation_id, reply_msg.born_host ); } }
📜 Review details
Configuration used: .coderabbit.yaml
Review profile: CHILL
📒 Files selected for processing (6)
rocketmq-client/src/consumer/consumer_impl/pop_process_queue.rs(1 hunks)rocketmq-client/src/consumer/consumer_impl/pop_request.rs(5 hunks)rocketmq-client/src/consumer/consumer_impl/re_balance.rs(2 hunks)rocketmq-client/src/consumer/consumer_impl/re_balance/rebalance_impl.rs(6 hunks)rocketmq-client/src/consumer/consumer_impl/re_balance/rebalance_push_impl.rs(2 hunks)rocketmq-client/src/implementation/client_remoting_processor.rs(1 hunks)
🔇 Additional comments (5)
rocketmq-client/src/consumer/consumer_impl/re_balance.rs (2)
46-50: Method signature updated appropriately
The method remove_unnecessary_pop_message_queue now accepts references to MessageQueue and PopProcessQueue, improving performance by avoiding unnecessary cloning.
72-72: Converted dispatch_pop_pull_request method to async
The dispatch_pop_pull_request method is now asynchronous, enabling non-blocking execution and better performance during pop pull request dispatching.
rocketmq-client/src/consumer/consumer_impl/pop_request.rs (1)
29-30: Updated types to CheetahString for topic and consumer_group
Changing the types of topic and consumer_group to CheetahString can improve performance due to lighter string handling.
rocketmq-client/src/implementation/client_remoting_processor.rs (2)
84-84: LGTM: Clean integration of POP reply message handling
The new request code case is properly integrated into the existing match statement and follows the established pattern.
Line range hint 135-147: Verify security implications of message decompression
The code decompresses messages from potentially untrusted sources. Please verify:
- The maximum allowed size for decompressed messages
- Protection against zip bomb attacks
- Resource limits during decompression
✅ Verification successful
Let me search for more specific information about message size limits and security controls.
Based on the search results, I can now provide a final response:
Message decompression has adequate security controls in place
The codebase implements several safeguards against decompression attacks:
- Maximum message size limit of 4MB (1024 * 1024 * 4) is enforced by default in both producer and store configurations
- Message size validation occurs before decompression in
MessageExtEncoder - Messages exceeding size limits are rejected with
MessageIllegalstatus - Compression/decompression is handled by well-tested standard libraries (zlib, lz4, zstd) with built-in safeguards
- CRC32 checksums are used to verify message integrity
The implementation follows RocketMQ's standard security practices for handling compressed messages from untrusted sources.
🏁 Scripts executed
The following scripts were executed for the analysis:
Script:
#!/bin/bash
# Search for compression-related security measures
rg -A 5 "CompressorFactory|decompress|compress"
# Look for any size limits or security configurations
rg -A 5 "max.*size|limit|security"
Length of output: 75388
Script:
#!/bin/bash
# Look for message size limits and security controls
rg -A 5 "max_message_size|max_body_size|max_decompressed|max_compressed"
# Search for any security-related configuration or validation
rg -A 5 "validate|check|verify|limit|security" rocketmq-common/src/common/compression/
Length of output: 19009
| fn remove_unnecessary_pop_message_queue( | ||
| &mut self, | ||
| mq: &MessageQueue, | ||
| pq: &PopProcessQueue, | ||
| ) -> bool { |
There was a problem hiding this comment.
Implement remove_unnecessary_pop_message_queue method
The remove_unnecessary_pop_message_queue method is currently unimplemented and contains a todo!() placeholder. Please provide the necessary implementation.
Do you need assistance implementing this method, or should I open a GitHub issue to track this task?
| async fn dispatch_pop_pull_request(&self, pull_request_list: Vec<PopRequest>, delay: u64) { | ||
| todo!() | ||
| } |
There was a problem hiding this comment.
Implement dispatch_pop_pull_request method
The dispatch_pop_pull_request method is currently unimplemented and contains a todo!() placeholder. Implementing this method is essential for proper dispatching of pop pull requests.
Would you like assistance in implementing this method, or should I open a GitHub issue to track this task?
| let sub_rebalance_impl = self.sub_rebalance_impl.as_mut().unwrap().upgrade(); | ||
| if sub_rebalance_impl.is_none() { | ||
| return false; | ||
| } | ||
| let mut sub_rebalance_impl = sub_rebalance_impl.unwrap(); |
There was a problem hiding this comment.
Handle None case without unwrapping Option
In the update_message_queue_assignment method, unwrapping self.sub_rebalance_impl without checking for None can lead to a panic. Safely handle the None case to maintain robustness.
Apply this diff to fix the issue:
- let sub_rebalance_impl = self.sub_rebalance_impl.as_mut().unwrap().upgrade();
- if sub_rebalance_impl.is_none() {
+ let sub_rebalance_impl = match self.sub_rebalance_impl.as_mut() {
+ Some(sub_rebalance) => sub_rebalance.upgrade(),
+ None => {
+ error!("sub_rebalance_impl is None");
+ return false;
+ }
+ };
+ if sub_rebalance_impl.is_none() {
return false;
}📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| let sub_rebalance_impl = self.sub_rebalance_impl.as_mut().unwrap().upgrade(); | |
| if sub_rebalance_impl.is_none() { | |
| return false; | |
| } | |
| let mut sub_rebalance_impl = sub_rebalance_impl.unwrap(); | |
| let sub_rebalance_impl = match self.sub_rebalance_impl.as_mut() { | |
| Some(sub_rebalance) => sub_rebalance.upgrade(), | |
| None => { | |
| error!("sub_rebalance_impl is None"); | |
| return false; | |
| } | |
| }; | |
| if sub_rebalance_impl.is_none() { | |
| return false; | |
| } | |
| let mut sub_rebalance_impl = sub_rebalance_impl.unwrap(); |
| let sub_rebalance_impl = self.sub_rebalance_impl.as_mut().unwrap().upgrade(); | ||
| if sub_rebalance_impl.is_none() { | ||
| return false; | ||
| } |
There was a problem hiding this comment.
Avoid potential panic by handling Option without unwrapping
At line 282, using unwrap() on self.sub_rebalance_impl.as_mut() can cause a panic if self.sub_rebalance_impl is None. Consider handling the None case to prevent potential crashes.
Apply this diff to fix the issue:
- let sub_rebalance_impl = self.sub_rebalance_impl.as_mut().unwrap().upgrade();
- if sub_rebalance_impl.is_none() {
+ let sub_rebalance_impl = match self.sub_rebalance_impl.as_mut() {
+ Some(sub_rebalance) => sub_rebalance.upgrade(),
+ None => {
+ error!("sub_rebalance_impl is None");
+ return false;
+ }
+ };
+ if sub_rebalance_impl.is_none() {
return false;
}📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| let sub_rebalance_impl = self.sub_rebalance_impl.as_mut().unwrap().upgrade(); | |
| if sub_rebalance_impl.is_none() { | |
| return false; | |
| } | |
| let sub_rebalance_impl = match self.sub_rebalance_impl.as_mut() { | |
| Some(sub_rebalance) => sub_rebalance.upgrade(), | |
| None => { | |
| error!("sub_rebalance_impl is None"); | |
| return false; | |
| } | |
| }; | |
| if sub_rebalance_impl.is_none() { | |
| return false; | |
| } |
| pub(crate) fn set_dropped(&self, dropped: bool) { | ||
| //self.dropped = dropped; |
There was a problem hiding this comment.
set_dropped method does not update the dropped flag
The set_dropped method no longer updates the dropped field because the assignment is commented out, and the method now takes &self instead of &mut self. This prevents the method from performing its intended function.
Consider restoring the ability to modify the dropped flag by accepting &mut self and uncommenting the assignment.
Apply this diff to fix the issue:
-pub(crate) fn set_dropped(&self, dropped: bool) {
- //self.dropped = dropped;
+pub(crate) fn set_dropped(&mut self, dropped: bool) {
+ self.dropped = dropped;
}📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| pub(crate) fn set_dropped(&self, dropped: bool) { | |
| //self.dropped = dropped; | |
| pub(crate) fn set_dropped(&mut self, dropped: bool) { | |
| self.dropped = dropped; |
Codecov ReportAttention: Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## main #1600 +/- ##
==========================================
- Coverage 25.49% 25.38% -0.11%
==========================================
Files 459 459
Lines 60299 60554 +255
==========================================
Hits 15371 15371
- Misses 44928 45183 +255 ☔ View full report in Codecov by Sentry. |
Which Issue(s) This PR Fixes(Closes)
Fixes #1598
Brief Description
How Did You Test This Change?
Summary by CodeRabbit
Release Notes
New Features
Bug Fixes
Refactor
Documentation