[ISSUE #1004]🚀optimize and improve consume logic⚡️#1008
Conversation
WalkthroughThe changes in this pull request introduce significant modifications to the RocketMQ client implementation, particularly focusing on enhancing the locking and unlocking mechanisms for message queues. New asynchronous methods are added to various structs and traits, improving the handling of message consumption and queue management. Additionally, new request and response structures are introduced to facilitate batch operations for locking and unlocking message queues. Changes
Assessment against linked issues
Possibly related PRs
Suggested labels
Suggested reviewers
Poem
📜 Recent review detailsConfiguration used: .coderabbit.yaml ⛔ Files ignored due to path filters (1)
📒 Files selected for processing (7)
🚧 Files skipped from review as they are similar to previous changes (5)
🔇 Additional comments (2)
Thank you for using CodeRabbit. We offer it for free to the OSS community and would appreciate your support in helping us grow. If you find it useful, would you consider giving us a shout-out on your favorite social media? 🪧 TipsChatThere are 3 ways to chat with CodeRabbit:
Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments. CodeRabbit Commands (Invoked using PR comments)
Other keywords and placeholders
Documentation and Community
|
|
🚀Thanks for your contribution🎉. CodeRabbit(AI) will review your code first🔥 |
Codecov ReportAttention: Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## main #1008 +/- ##
==========================================
- Coverage 19.95% 19.92% -0.04%
==========================================
Files 413 418 +5
Lines 34440 34502 +62
==========================================
+ Hits 6874 6876 +2
- Misses 27566 27626 +60 ☔ View full report in Codecov by Sentry. |
There was a problem hiding this comment.
Actionable comments posted: 22
🧹 Outside diff range and nitpick comments (18)
rocketmq-remoting/src/protocol/body/response/lock_batch_response_body.rs (1)
23-27: Consider adding documentation and reviewing trait derivations.The
LockBatchResponseBodystruct looks good overall, but consider the following suggestions:
- Add documentation comments (///) for the struct and its field to improve API clarity.
- The
#[serde(rename = "lockOKMQSet")]attribute uses camelCase, which is atypical for Rust. If this is for compatibility with an external JSON format, consider adding a comment explaining this choice.- Review if the
Defaulttrait derivation is necessary. If there's no clear default state for this response body, it might be better to omit it to avoid potential misuse.Example improvement:
/// Represents the response body for a batch locking operation in RocketMQ. #[derive(Serialize, Deserialize, Debug)] pub struct LockBatchResponseBody { /// Set of message queues that were successfully locked. #[serde(rename = "lockOKMQSet")] // Note: camelCase used for compatibility with external JSON format pub lock_ok_mq_set: HashSet<MessageQueue>, }rocketmq-remoting/src/protocol/body/request/lock_batch_request_body.rs (1)
24-31: LGTM! Consider adding documentation.The
LockBatchRequestBodystruct is well-designed with appropriate field types and derive macros. The use ofOption<String>forconsumer_groupandclient_id, andHashSet<MessageQueue>formq_setare good choices.Consider adding documentation comments to the struct and its fields to improve code readability and maintainability. For example:
/// Represents a batch request for locking message queues. #[derive(Serialize, Deserialize, Debug, Default)] #[serde(rename_all = "camelCase")] pub struct LockBatchRequestBody { /// The consumer group making the lock request. pub consumer_group: Option<String>, /// The ID of the client making the lock request. pub client_id: Option<String>, /// Whether to restrict the operation to the current broker only. pub only_this_broker: bool, /// The set of message queues to be locked. pub mq_set: HashSet<MessageQueue>, }rocketmq-remoting/src/protocol/body/unlock_batch_request_body.rs (2)
24-31: LGTM: Struct definition is well-structured. Consider adding documentation.The
UnlockBatchRequestBodystruct is correctly defined with appropriate field types and derive macros. The use ofOption<String>for optional fields andHashSet<MessageQueue>for unique message queues is a good design choice.Consider adding documentation comments (///) for the struct and its fields to improve code readability and maintainability. For example:
/// Represents a request body for unlocking a batch of messages. #[derive(Serialize, Deserialize, Debug, Default)] #[serde(rename_all = "camelCase")] pub struct UnlockBatchRequestBody { /// The consumer group associated with the unlock request. pub consumer_group: Option<String>, /// The client ID associated with the unlock request. pub client_id: Option<String>, /// Indicates whether the operation should be restricted to the current broker. pub only_this_broker: bool, /// The set of message queues to be unlocked. pub mq_set: HashSet<MessageQueue>, }
33-45: LGTM: Display trait implementation is correct. Minor style improvement suggested.The Display trait implementation for UnlockBatchRequestBody is well-structured and correctly handles optional fields. It provides a comprehensive string representation of the struct.
Consider using the
format!macro instead ofwrite!for better readability:impl Display for UnlockBatchRequestBody { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { write!(f, "{}", format!( "UnlockBatchRequestBody [consumer_group={}, client_id={}, only_this_broker={}, mq_set={:?}]", self.consumer_group.as_deref().unwrap_or(""), self.client_id.as_deref().unwrap_or(""), self.only_this_broker, self.mq_set )) } }This change also uses
as_deref()instead ofas_ref()for a more idiomatic approach to handlingOption<String>.rocketmq-remoting/src/protocol/header/lock_batch_mq_request_header.rs (2)
26-31: LGTM! Consider adding documentation.The
LockBatchMqRequestHeaderstruct is well-defined with appropriate derive macros and serde attributes. The use ofOption<RpcRequestHeader>provides flexibility.Consider adding documentation comments to describe the purpose of this struct and its field. This would improve code readability and maintainability. For example:
/// Represents the header for a batch lock request in the RocketMQ protocol. #[derive(Serialize, Deserialize, Debug, Default)] #[serde(rename_all = "camelCase")] pub struct LockBatchMqRequestHeader { /// The RPC request header, flattened into the JSON structure. #[serde(flatten)] pub rpc_request_header: Option<RpcRequestHeader>, }
33-43: LGTM! Consider handling empty map case.The implementation of
CommandCustomHeaderforLockBatchMqRequestHeaderis correct and efficiently handles the optionalrpc_request_header.Consider returning
Noneif the resulting map is empty. This could provide more meaningful information to the caller. Here's a suggested implementation:impl CommandCustomHeader for LockBatchMqRequestHeader { fn to_map(&self) -> Option<HashMap<String, String>> { self.rpc_request_header .as_ref() .and_then(|header| header.to_map()) .filter(|map| !map.is_empty()) } }This implementation is more concise and returns
Noneifrpc_request_headerisNoneor if itsto_map()method returns an empty map.rocketmq-remoting/src/protocol/header/unlock_batch_mq_request_header.rs (1)
26-31: LGTM: Struct definition is correct. Consider adding documentation.The
UnlockBatchMqRequestHeaderstruct is well-defined with appropriate serde attributes. The use ofOption<RpcRequestHeader>is a good choice for handling cases where the header might not be present.Consider adding documentation comments to explain the purpose of this struct and its field. For example:
/// Represents a request header for unlocking a batch of message queues. #[derive(Serialize, Deserialize, Debug, Default)] #[serde(rename_all = "camelCase")] pub struct UnlockBatchMqRequestHeader { /// The RPC request header, if present. #[serde(flatten)] pub rpc_request_header: Option<RpcRequestHeader>, }rocketmq-client/src/consumer/consumer_impl/re_balance.rs (3)
Line range hint
47-51: Possible Redundancy in Function NameThe method
remove_unnecessary_pop_message_queue_pophas "pop" repeated twice in its name, which may be redundant or a typo. Consider renaming it toremove_unnecessary_pop_message_queueorremove_unnecessary_pop_process_queuefor clarity.
61-61: Inconsistent Return Types betweencompute_pull_from_whereMethodsThe method
compute_pull_from_where_with_exceptionreturns aResult<i64>, whereascompute_pull_from_wherereturns ani64directly. For consistency and better error handling, consider havingcompute_pull_from_wherealso return aResult<i64>.
65-67: Inconsistent Asynchronicity Between Dispatch MethodsThe method
dispatch_pull_requestis asynchronous (async fn), whiledispatch_pop_pull_requestis synchronous (fn). If both methods perform operations that could be asynchronous, consider makingdispatch_pop_pull_requestasynchronous for consistency and maintainability.rocketmq-client/src/consumer/consumer_impl/re_balance/rebalance_push_impl.rs (1)
470-476: Use appropriate logging level for successful operationsIn the success case of the
unlockmethod, you are usingwarn!to log the message indicating a successful unlock:warn!( "unlock messageQueue. group:{}, clientId:{}, mq:{}", self.rebalance_impl_inner.consumer_group.as_ref().unwrap(), client.client_id, mq )Logging successful operations at the warning level can be misleading. Consider using
info!instead to indicate normal operation.Apply this diff to change the logging level:
} else { - warn!( + info!( "unlock messageQueue. group:{}, clientId:{}, mq:{}", self.rebalance_impl_inner.consumer_group.as_ref().unwrap(), client.client_id, mq ) }rocketmq-client/src/consumer/consumer_impl/re_balance/rebalance_impl.rs (7)
20-20: ImportDerefMutmay not be necessaryThe import
use std::ops::DerefMut;is added, but it's only used once withderef_mut()in the code. Consider removing the explicit call toderef_mut()and use pattern matching or adjust the code to avoid needing this import if possible.
Line range hint
242-253: Check for proper lock acquisition inupdate_process_queue_table_in_rebalanceAt line 242, the code attempts to lock message queues if
is_orderis true. Thelockmethod is now asynchronous and modifiesprocess_queue_table. Ensure that the mutable borrow ofprocess_queue_tabledoes not conflict with other borrows in the loop.Consider restructuring the code to prevent mutable and immutable borrows from overlapping. Alternatively, move the lock acquisition outside the loop or collect the queues to lock and process them after releasing the write lock.
469-522: New methodlock_alllacks error handling for broker unavailabilityIn the
lock_allmethod, iffind_broker_resultisNone, the code silently continues without logging or handling the error. This might make debugging broker connectivity issues difficult.Consider adding a warning log when
find_broker_resultisNone:if let Some(find_broker_result) = find_broker_result { // existing code } else { warn!( "Could not find broker address for broker: {}", broker_name ); }
445-463: Improve error handling inlockmethodWhen handling the result of
lock_batch_mq, the error case at lines 459-462 only logs the error but does not provide detailed context.Enhance the error logging to include more context:
error!( "Failed to lock batch MQ for {:?} at broker {}: {}", self.consumer_group, find_broker_result.broker_addr, e );
185-200: Incorrect logging level and message formattingAt lines 194-200, an
error!macro is used for logging. The message seems to indicate an attempt to fix an issue rather than an actual error.Consider changing the logging level to
warn!and improving the message clarity.-error!( +warn!( "doRebalance, {:?}, removing unnecessary mq: {}, because pull is paused.", self.consumer_group, mq.get_topic() );
Line range hint
242-253: Avoid holding write lock during asynchronous operationsAt line 239, a write lock on
process_queue_tableis acquired. Inside the loop starting at line 241, asynchronous operations are performed, including calls toself.lock. Holding the write lock across awaits can lead to contention and potential deadlocks.Refactor the code to minimize the time the write lock is held. Collect necessary data, release the lock, and then perform asynchronous operations.
469-522: Lack of timeout handling forlock_batch_mqrequestsThe
lock_batch_mqmethod is called with a timeout of1_000milliseconds. If the broker does not respond in time, the method will fail silently for that broker.Consider implementing retries or exponential backoff when a timeout occurs to improve reliability.
📜 Review details
Configuration used: .coderabbit.yaml
Review profile: CHILL
📒 Files selected for processing (14)
- rocketmq-client/src/consumer/consumer_impl/process_queue.rs (5 hunks)
- rocketmq-client/src/consumer/consumer_impl/re_balance.rs (2 hunks)
- rocketmq-client/src/consumer/consumer_impl/re_balance/rebalance_impl.rs (5 hunks)
- rocketmq-client/src/consumer/consumer_impl/re_balance/rebalance_push_impl.rs (4 hunks)
- rocketmq-client/src/implementation/mq_client_api_impl.rs (3 hunks)
- rocketmq-remoting/src/protocol/body.rs (1 hunks)
- rocketmq-remoting/src/protocol/body/request.rs (1 hunks)
- rocketmq-remoting/src/protocol/body/request/lock_batch_request_body.rs (1 hunks)
- rocketmq-remoting/src/protocol/body/response.rs (1 hunks)
- rocketmq-remoting/src/protocol/body/response/lock_batch_response_body.rs (1 hunks)
- rocketmq-remoting/src/protocol/body/unlock_batch_request_body.rs (1 hunks)
- rocketmq-remoting/src/protocol/header.rs (2 hunks)
- rocketmq-remoting/src/protocol/header/lock_batch_mq_request_header.rs (1 hunks)
- rocketmq-remoting/src/protocol/header/unlock_batch_mq_request_header.rs (1 hunks)
🔇 Additional comments (24)
rocketmq-remoting/src/protocol/body/request.rs (1)
17-17: LGTM! New module added for batch locking requests.The addition of the
lock_batch_request_bodymodule aligns with the PR objectives of optimizing and improving consume logic. This new module likely encapsulates functionality for handling batch lock requests, which could potentially improve efficiency in message queue management.To ensure the proper implementation of this new module, please run the following verification script:
This script will help verify:
- The existence of the implementation file
- The basic structure of the
LockBatchRequestBody- Any usage of the new module in other parts of the codebase
Please review the results to ensure the new module is properly implemented and integrated.
✅ Verification successful
It seems there was an error in the verification script when checking for the implementation file. Let's correct the
fdcommand and rerun the tests to ensure everything is in order.
Verification Successful 🚀
The
lock_batch_request_bodymodule is properly implemented and integrated into the codebase. TheLockBatchRequestBodystruct is correctly defined, and the module is actively used in the following locations:
rocketmq-client/src/implementation/mq_client_api_impl.rsrocketmq-client/src/consumer/consumer_impl/re_balance/rebalance_impl.rsAll tests passed, and no issues were found.
🏁 Scripts executed
The following scripts were executed for the analysis:
Script:
#!/bin/bash # Description: Verify the implementation of the new lock_batch_request_body module # Test 1: Check if the implementation file exists echo "Checking for the implementation file..." fd --type f --full-path "*/lock_batch_request_body.rs" # Test 2: Verify the contents of the implementation file echo "Checking the contents of the implementation file..." rg --type rust "pub struct LockBatchRequestBody" -A 10 # Test 3: Check for any usage of the new module echo "Checking for usage of the new module..." rg --type rust "use .*lock_batch_request_body::LockBatchRequestBody"Length of output: 2210
Script:
#!/bin/bash # Description: Corrected verification of the new lock_batch_request_body module # Test 1: Check if the implementation file exists echo "Checking for the implementation file..." fd --glob '*lock_batch_request_body.rs' # Test 2: Verify the contents of the implementation file echo "Checking the contents of the implementation file..." rg --type rust "pub struct LockBatchRequestBody" -A 10 # Test 3: Check for any usage of the new module echo "Checking for usage of the new module..." rg --type rust "use .*lock_batch_request_body::LockBatchRequestBody"Length of output: 2023
rocketmq-remoting/src/protocol/body/response.rs (2)
1-17: LGTM: License header is correctly included.The Apache License 2.0 header is properly included at the beginning of the file, which is essential for maintaining compliance with open-source licensing requirements.
17-17: Verify the implementation of the new module.The addition of the
lock_batch_response_bodymodule is a good step towards expanding the functionality for batch locking operations. However, it's important to ensure that the implementation of this module is complete and consistent with the project's standards.To verify the implementation, please run the following script:
This script will help verify:
- The existence of the module file.
- The presence of expected structures or types within the module.
- The existence of related unit tests.
Please review the output of this script to ensure the module is properly implemented and tested.
rocketmq-remoting/src/protocol/body/response/lock_batch_response_body.rs (2)
1-16: LGTM: Appropriate license header included.The file includes the correct Apache License, Version 2.0 header, which is essential for open-source projects and consistent with the RocketMQ project's licensing.
17-21: LGTM: Appropriate imports.The imports are concise and relevant to the struct being defined:
HashSetfrom the standard library for the main field type.MessageQueuefrom a custom module, likely representing the elements in the set.SerializeandDeserializetraits from serde for JSON serialization/deserialization.These imports provide all necessary types and traits for the struct's functionality.
rocketmq-remoting/src/protocol/body.rs (1)
33-34: Approved: New modules align with PR objectives. Please provide more context.The addition of
request,response, andunlock_batch_request_bodymodules aligns with the PR objectives to optimize and improve consume logic. These new modules likely introduce structures for enhanced request-response handling and batch unlocking operations, which could significantly improve the efficiency of message queue management.To better understand the impact of these changes, could you please provide more information about the contents and purpose of these new modules? This will help ensure they integrate well with the existing codebase.
To verify the integration and usage of these new modules, please run the following script:
This script will help us understand how these new modules are being used throughout the codebase, ensuring they're properly integrated and not introducing any unintended side effects.
Also applies to: 37-37
✅ Verification successful
Verified: New modules are properly integrated and utilized within the codebase.
The
request,response, andunlock_batch_request_bodymodules are being used appropriately in the following locations:
rocketmq-client/src/implementation/mq_client_api_impl.rsrocketmq-client/src/consumer/consumer_impl/re_balance/rebalance_impl.rsrocketmq-client/src/consumer/consumer_impl/re_balance/rebalance_push_impl.rsNo issues were found regarding the integration of these modules.
🏁 Scripts executed
The following scripts were executed for the analysis:
Script:
#!/bin/bash # Description: Check for usage of new modules in the codebase echo "Checking usage of new request module:" rg --type rust -g '!src/protocol/body.rs' 'use .*protocol::body::request' echo "Checking usage of new response module:" rg --type rust -g '!src/protocol/body.rs' 'use .*protocol::body::response' echo "Checking usage of new unlock_batch_request_body module:" rg --type rust -g '!src/protocol/body.rs' 'use .*protocol::body::unlock_batch_request_body' echo "Checking for any direct references to the new modules:" rg --type rust -g '!src/protocol/body.rs' 'body::(request|response|unlock_batch_request_body)'Length of output: 2331
rocketmq-remoting/src/protocol/body/request/lock_batch_request_body.rs (1)
1-45: LGTM! Well-structured and follows best practices.The overall structure of the file is good, and it adheres to Rust best practices:
- Appropriate license header is included.
- Imports are well-organized and grouped.
- The code follows Rust naming conventions.
- The use of external crates and types is appropriate.
- The visibility of the struct and its implementation is correctly set to public.
rocketmq-remoting/src/protocol/body/unlock_batch_request_body.rs (2)
1-16: LGTM: License header is correct and properly formatted.The Apache License 2.0 header is present and correctly formatted at the beginning of the file.
17-22: LGTM: Imports are correct and well-organized.The necessary imports from std, rocketmq_common, and serde are present and logically grouped. There are no unused imports.
rocketmq-remoting/src/protocol/header/unlock_batch_mq_request_header.rs (2)
1-24: LGTM: License and imports are correctly implemented.The file includes the appropriate Apache 2.0 license header, and the import statements are well-organized and relevant to the functionality implemented in this file.
1-52: Overall assessment: Good implementation with room for minor improvements.The
UnlockBatchMqRequestHeaderstruct and its trait implementations provide a solid foundation for handling batch unlock requests in RocketMQ. The code is generally well-structured and follows Rust conventions.To further enhance the quality of this implementation, consider:
- Adding documentation comments to explain the purpose and usage of the struct and its methods.
- Optimizing the
to_mapmethod in theCommandCustomHeaderimplementation for better efficiency.- Improving error handling in the
FromMapimplementation to handle cases whereRpcRequestHeadercreation fails.These improvements will make the code more robust, maintainable, and easier to use for other developers working on the project.
rocketmq-remoting/src/protocol/header.rs (3)
36-36: Request for additional context on batch locking/unlocking improvements.The additions of
lock_batch_mq_request_headerandunlock_batch_mq_request_headermodules suggest improvements in batch operations for message queue locking. However, the PR description lacks details about the specific enhancements these new modules bring.Could you please provide more information on:
- The motivation behind introducing batch locking/unlocking.
- The expected performance improvements or other benefits.
- Any potential impacts on existing consume logic.
This context will help in better understanding the optimization efforts and their alignment with the project's goals.
Also applies to: 50-50
36-36: LGTM: New module for batch locking of message queues.The addition of the
lock_batch_mq_request_headermodule aligns with the PR objective of optimizing and improving consume logic. Its placement in the file follows the alphabetical order convention.To ensure the module is properly implemented, please run the following script:
#!/bin/bash # Description: Verify the implementation of the lock_batch_mq_request_header module # Test: Check if the module file exists and contains expected content rg --type rust -g 'lock_batch_mq_request_header.rs' -C 5 'struct LockBatchMqRequestHeader'
50-50: LGTM: New module for batch unlocking of message queues.The addition of the
unlock_batch_mq_request_headermodule complements thelock_batch_mq_request_headermodule and aligns with the PR objective of optimizing and improving consume logic. Its placement in the file follows the alphabetical order convention.To ensure the module is properly implemented, please run the following script:
✅ Verification successful
Verified:
unlock_batch_mq_request_headermodule is correctly implemented.The
unlock_batch_mq_request_headermodule contains the expectedUnlockBatchMqRequestHeaderstruct, confirming its proper implementation and alignment with the project's objectives.🏁 Scripts executed
The following scripts were executed for the analysis:
Script:
#!/bin/bash # Description: Verify the implementation of the unlock_batch_mq_request_header module # Test: Check if the module file exists and contains expected content rg --type rust -g 'unlock_batch_mq_request_header.rs' -C 5 'struct UnlockBatchMqRequestHeader'Length of output: 1256
rocketmq-client/src/consumer/consumer_impl/process_queue.rs (4)
128-130: LGTM: New methodinc_try_unlock_timeslooks good.The new
inc_try_unlock_timesmethod is a clean and thread-safe way to increment thetry_unlock_timescounter. It uses atomic operations, which is appropriate for this use case.
316-319: LGTM: New methodset_last_lock_timestampis well-implemented.The
set_last_lock_timestampmethod is a clean and thread-safe way to update thelast_lock_timestamp. It correctly uses an atomic store operation, which is appropriate for this concurrent scenario.
Line range hint
1-334: Summary: Changes align well with PR objectives.The modifications in this file, including the switch to
RocketMQTokioRwLockand the addition of methods for tracking unlock attempts and lock timestamps, are consistent with the PR's goal of optimizing and improving the consume logic. These changes enhance the locking mechanism and provide better management of concurrent operations, which should contribute to improved performance and reliability of the message consumption process.
33-33: Approve the change toRocketMQTokioRwLockand verify its impact.The change from
Arc<RwLock<()>>toArc<RocketMQTokioRwLock<()>>for theconsume_lockfield is a good optimization for asynchronous operations. This custom lock is likely designed to work more efficiently with the Tokio runtime.To ensure this change doesn't introduce any unexpected behavior, please run the following verification:
This will help identify any places where the standard
RwLockis still being used, ensuring consistency across the codebase.Also applies to: 60-60, 81-81
rocketmq-client/src/consumer/consumer_impl/re_balance.rs (1)
76-76: Updated Signature ofunlockRequires VerificationThe method
unlockhas been changed to be asynchronous and now takes&mut selfinstead of&self, andmqis now passed by reference&MessageQueueinstead of by valueMessageQueue. Ensure that all implementations of theRebalanceLocaltrait and all calls tounlockare updated to match the new signature.Run the following script to verify all usages of
unlock:✅ Verification successful
Verification Successful: All
unlockMethod Usages Are Correctly UpdatedAll implementations of the
RebalanceLocaltrait and calls to theunlockmethod have been updated to match the new asynchronous signature.🏁 Scripts executed
The following scripts were executed for the analysis:
Script:
#!/bin/bash # Description: Find all implementations and calls of `unlock` that may need updating. # Find all implementations of `RebalanceLocal` and inspect `unlock` method signatures. rg --type rust 'impl.*RebalanceLocal' -A 20 | rg 'fn unlock' # Find all calls to the `unlock` method. rg --type rust 'unlock\('Length of output: 635
rocketmq-client/src/consumer/consumer_impl/re_balance/rebalance_push_impl.rs (2)
20-20: ImportingDurationfor timeout handlingThe addition of
use std::time::Duration;is appropriate and necessary for handling timeouts when attempting to acquire locks.
32-32: ImportingUnlockBatchRequestBodyfor unlock requestsThe import of
UnlockBatchRequestBodyis essential for constructing unlock batch requests to the broker.rocketmq-client/src/consumer/consumer_impl/re_balance/rebalance_impl.rs (3)
421-467:⚠️ Potential issueModified
lockmethod signature impacts existing implementationsThe
lockmethod signature has changed to:pub async fn lock( &mut self, mq: &MessageQueue, process_queue_table: &mut HashMap<MessageQueue, Arc<ProcessQueue>>, ) -> boolThis modification requires updating all calls to
lockto passprocess_queue_table. Ensure that all callers are updated accordingly to prevent compilation errors.Run the following script to find all usages of
lockand verify they are updated:#!/bin/bash # Find all usages of the `lock` method rg --type rust 'self\.lock\('
207-227: Potential race condition when modifyingprocess_queue_tableWhen removing items from
process_queue_tableat lines 219-220, ensure that no other asynchronous tasks are modifying the same data concurrently without proper synchronization. Since the write lock is acquired, this should prevent concurrent writes, but verify that all accesses are properly synchronized.Automate verification by checking all accesses to
process_queue_table:Ensure that all accesses are protected by appropriate locks.
421-467: Potential infinite recursion inlockmethodThe
lockmethod callsclient.get_broker_name_from_message_queue(mq).await, which may internally rely on methods that eventually calllockagain, leading to potential infinite recursion.Check the implementation of
get_broker_name_from_message_queueto ensure it doesn't create a recursive loop.
| impl Display for LockBatchRequestBody { | ||
| fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { | ||
| write!( | ||
| f, | ||
| "UnlockBatchRequestBody [consumer_group={}, client_id={}, only_this_broker={}, \ | ||
| mq_set={:?}]", | ||
| self.consumer_group.as_ref().unwrap_or(&"".to_string()), | ||
| self.client_id.as_ref().unwrap_or(&"".to_string()), | ||
| self.only_this_broker, | ||
| self.mq_set | ||
| ) | ||
| } | ||
| } |
There was a problem hiding this comment.
Fix incorrect struct name in Display implementation.
The Display implementation for LockBatchRequestBody is generally well-implemented. However, there's a minor issue in the output string:
The struct name in the output string is incorrect. It says "UnlockBatchRequestBody" instead of "LockBatchRequestBody". Please apply the following fix:
- "UnlockBatchRequestBody [consumer_group={}, client_id={}, only_this_broker={}, \
+ "LockBatchRequestBody [consumer_group={}, client_id={}, only_this_broker={}, \
mq_set={:?}]",This will ensure that the string representation accurately reflects the struct name.
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| impl Display for LockBatchRequestBody { | |
| fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { | |
| write!( | |
| f, | |
| "UnlockBatchRequestBody [consumer_group={}, client_id={}, only_this_broker={}, \ | |
| mq_set={:?}]", | |
| self.consumer_group.as_ref().unwrap_or(&"".to_string()), | |
| self.client_id.as_ref().unwrap_or(&"".to_string()), | |
| self.only_this_broker, | |
| self.mq_set | |
| ) | |
| } | |
| } | |
| impl Display for LockBatchRequestBody { | |
| fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { | |
| write!( | |
| f, | |
| "LockBatchRequestBody [consumer_group={}, client_id={}, only_this_broker={}, \ | |
| mq_set={:?}]", | |
| self.consumer_group.as_ref().unwrap_or(&"".to_string()), | |
| self.client_id.as_ref().unwrap_or(&"".to_string()), | |
| self.only_this_broker, | |
| self.mq_set | |
| ) | |
| } | |
| } |
| impl FromMap for LockBatchMqRequestHeader { | ||
| type Target = Self; | ||
|
|
||
| fn from(map: &HashMap<String, String>) -> Option<Self::Target> { | ||
| let rpc_request_header = <RpcRequestHeader as FromMap>::from(map); | ||
| Some(LockBatchMqRequestHeader { rpc_request_header }) | ||
| } | ||
| } |
There was a problem hiding this comment.
Improve error handling and consistency with to_map.
While the implementation of FromMap for LockBatchMqRequestHeader is functional, there are opportunities for improvement:
-
Error Handling: The current implementation always returns
Some(...), even ifRpcRequestHeader::fromreturnsNone. This could lead to unexpected behavior. -
Consistency: The
to_mapmethod returnsNonefor an empty map (after the suggested improvement), butfromalways returnsSome(...). This inconsistency could be confusing.
Consider the following implementation to address these issues:
impl FromMap for LockBatchMqRequestHeader {
type Target = Self;
fn from(map: &HashMap<String, String>) -> Option<Self::Target> {
if map.is_empty() {
None
} else {
Some(LockBatchMqRequestHeader {
rpc_request_header: <RpcRequestHeader as FromMap>::from(map),
})
}
}
}This implementation:
- Returns
Noneif the input map is empty, maintaining consistency with theto_mapmethod. - Preserves the
Optionreturned byRpcRequestHeader::from, allowing for better error handling downstream.
| impl CommandCustomHeader for UnlockBatchMqRequestHeader { | ||
| fn to_map(&self) -> Option<HashMap<String, String>> { | ||
| let mut map = HashMap::new(); | ||
| if let Some(value) = self.rpc_request_header.as_ref() { | ||
| if let Some(value) = value.to_map() { | ||
| map.extend(value); | ||
| } | ||
| } | ||
| Some(map) | ||
| } | ||
| } |
There was a problem hiding this comment.
🛠️ Refactor suggestion
Consider optimizing the to_map method implementation.
The current implementation of to_map is functional but can be improved for better efficiency and idiomatic Rust.
Consider refactoring the to_map method as follows:
impl CommandCustomHeader for UnlockBatchMqRequestHeader {
fn to_map(&self) -> Option<HashMap<String, String>> {
self.rpc_request_header
.as_ref()
.and_then(|header| header.to_map())
}
}This implementation:
- Directly returns
Noneifrpc_request_headerisNone. - Only creates a
HashMapifrpc_request_headerisSomeand itsto_mapmethod returnsSome. - Avoids creating an empty
HashMapwhen it's not needed.
| impl FromMap for UnlockBatchMqRequestHeader { | ||
| type Target = Self; | ||
|
|
||
| fn from(map: &HashMap<String, String>) -> Option<Self::Target> { | ||
| let rpc_request_header = <RpcRequestHeader as FromMap>::from(map); | ||
| Some(UnlockBatchMqRequestHeader { rpc_request_header }) | ||
| } | ||
| } |
There was a problem hiding this comment.
Improve error handling in the FromMap implementation.
The current implementation of from always returns Some, which might not be appropriate for all cases.
Consider refactoring the from method to handle potential errors:
impl FromMap for UnlockBatchMqRequestHeader {
type Target = Self;
fn from(map: &HashMap<String, String>) -> Option<Self::Target> {
let rpc_request_header = <RpcRequestHeader as FromMap>::from(map);
// Only create the UnlockBatchMqRequestHeader if RpcRequestHeader was successfully created
rpc_request_header.map(|header| UnlockBatchMqRequestHeader {
rpc_request_header: Some(header),
})
}
}This implementation:
- Only creates an
UnlockBatchMqRequestHeaderifRpcRequestHeaderwas successfully created from the map. - Returns
NoneifRpcRequestHeadercouldn't be created, which is more appropriate for error cases.
| } | ||
|
|
||
| fn consume_type(&self) -> ConsumeType; | ||
|
|
There was a problem hiding this comment.
Consider Changing &self to &mut self in remove_dirty_offset
The method remove_dirty_offset may modify internal state when removing offsets. If this is the case, it should take a mutable reference &mut self instead of an immutable reference &self to reflect that it mutates the receiver.
| use rocketmq_common::ArcRefCellWrapper; | ||
| use rocketmq_common::TimeUtils::get_current_millis; | ||
| use rocketmq_common::WeakCellWrapper; | ||
| use rocketmq_remoting::protocol::body::request::lock_batch_request_body::LockBatchRequestBody; |
There was a problem hiding this comment.
Unused import LockBatchRequestBody
The import LockBatchRequestBody is added but not used in the code provided. If it's intended for future use, consider adding a comment explaining its purpose. Otherwise, remove the unused import.
Apply this diff to remove the unused import:
-use rocketmq_remoting::protocol::body::request::lock_batch_request_body::LockBatchRequestBody;📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| use rocketmq_remoting::protocol::body::request::lock_batch_request_body::LockBatchRequestBody; |
| async fn build_process_queue_table_by_broker_name( | ||
| &self, | ||
| ) -> HashMap<String /* brokerName */, HashSet<MessageQueue>> { | ||
| let mut result = HashMap::new(); | ||
| let process_queue_table = self.process_queue_table.read().await; | ||
| let client = self.client_instance.as_ref().unwrap(); | ||
| for (mq, pq) in process_queue_table.iter() { | ||
| if pq.is_dropped() { | ||
| continue; | ||
| } | ||
| let broker_name = client.get_broker_name_from_message_queue(mq).await; | ||
| let entry = result.entry(broker_name).or_insert(HashSet::new()); | ||
| entry.insert(mq.clone()); | ||
| } | ||
| result |
There was a problem hiding this comment.
🛠️ Refactor suggestion
Inefficient cloning in build_process_queue_table_by_broker_name
At line 536, message queues are cloned and inserted into a HashSet. If MessageQueue implements Copy, cloning can be avoided. If not, consider if it's necessary to clone here or if references can be used instead.
Adjust the code to avoid unnecessary cloning:
- entry.insert(mq.clone());
+ entry.insert(mq.to_owned());Ensure that to_owned() is implemented efficiently for MessageQueue.
Committable suggestion was skipped due to low confidence.
| pub async fn unlock_batch_mq( | ||
| &mut self, | ||
| addr: &str, | ||
| request_body: UnlockBatchRequestBody, | ||
| timeout_millis: u64, | ||
| oneway: bool, | ||
| ) -> Result<()> { | ||
| let mut request = RemotingCommand::create_request_command( | ||
| RequestCode::UnlockBatchMq, | ||
| UnlockBatchMqRequestHeader::default(), | ||
| ); | ||
| request.set_body_mut_ref(Some(request_body.encode())); | ||
| if oneway { | ||
| self.remoting_client | ||
| .invoke_oneway(addr.to_string(), request, timeout_millis) | ||
| .await; | ||
| Ok(()) | ||
| } else { | ||
| let response = self | ||
| .remoting_client | ||
| .invoke_async( | ||
| Some(mix_all::broker_vip_channel( | ||
| self.client_config.vip_channel_enabled, | ||
| addr, | ||
| )), | ||
| request, | ||
| timeout_millis, | ||
| ) | ||
| .await?; | ||
| if ResponseCode::from(response.code()) == ResponseCode::Success { | ||
| Ok(()) | ||
| } else { | ||
| Err(MQBrokerError( | ||
| response.code(), | ||
| response.remark().map_or("".to_string(), |s| s.to_string()), | ||
| addr.to_string(), | ||
| )) | ||
| } | ||
| } |
There was a problem hiding this comment.
🛠️ Refactor suggestion
Refactor to reduce code duplication in unlock_batch_mq
Both branches of the if oneway condition in unlock_batch_mq share similar logic for creating the request and handling the response. Consider refactoring to reduce code duplication and improve maintainability.
You could extract the common request creation into a separate function or reorganize the code to minimize repetition.
| LockBatchResponseBody::decode(response.body().as_ref().unwrap().as_ref()) | ||
| .map(|body| body.lock_ok_mq_set) | ||
| .map_err(|e| MQBrokerError(response.code(), e.to_string(), addr.to_string())) |
There was a problem hiding this comment.
Avoid potential panic due to unwrap() on response.body()
In lock_batch_mq, the call to response.body().as_ref().unwrap() may panic if response.body() is None. This can happen if the response does not contain a body, leading to a runtime panic.
To prevent a potential panic, you should check if response.body() is Some before attempting to use it. Handle the None case appropriately.
Apply this diff:
- LockBatchResponseBody::decode(response.body().as_ref().unwrap().as_ref())
+ if let Some(body) = response.body() {
+ LockBatchResponseBody::decode(body.as_ref())
.map(|body| body.lock_ok_mq_set)
.map_err(|e| MQBrokerError(response.code(), e.to_string(), addr.to_string()))
+ } else {
+ Err(MQBrokerError(
+ response.code(),
+ "Response body is empty".to_string(),
+ addr.to_string(),
+ ))
+ }📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| LockBatchResponseBody::decode(response.body().as_ref().unwrap().as_ref()) | |
| .map(|body| body.lock_ok_mq_set) | |
| .map_err(|e| MQBrokerError(response.code(), e.to_string(), addr.to_string())) | |
| if let Some(body) = response.body() { | |
| LockBatchResponseBody::decode(body.as_ref()) | |
| .map(|body| body.lock_ok_mq_set) | |
| .map_err(|e| MQBrokerError(response.code(), e.to_string(), addr.to_string())) | |
| } else { | |
| Err(MQBrokerError( | |
| response.code(), | |
| "Response body is empty".to_string(), | |
| addr.to_string(), | |
| )) | |
| } |
| self.remoting_client | ||
| .invoke_oneway(addr.to_string(), request, timeout_millis) | ||
| .await; |
There was a problem hiding this comment.
Handle errors from invoke_oneway in unlock_batch_mq
In the oneway branch of unlock_batch_mq, the result of invoke_oneway is awaited, but any potential errors are not being handled. This could lead to unhandled errors if the method fails.
To properly handle errors, you should propagate the error by using await?, so that any errors are returned to the caller.
Apply this diff:
self.remoting_client
- .invoke_oneway(addr.to_string(), request, timeout_millis)
- .await;
+ .invoke_oneway(addr.to_string(), request, timeout_millis)
+ .await?;
Ok(())📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| self.remoting_client | |
| .invoke_oneway(addr.to_string(), request, timeout_millis) | |
| .await; | |
| self.remoting_client | |
| .invoke_oneway(addr.to_string(), request, timeout_millis) | |
| .await?; | |
| Ok(()) |
Ensure consistent use of VIP channel in invoke_oneway
In the oneway branch, invoke_oneway is called with addr.to_string(), while in the else branch, invoke_async uses mix_all::broker_vip_channel to potentially adjust the address based on the VIP channel configuration. This inconsistency may cause issues when vip_channel_enabled is true.
Consider modifying the invoke_oneway call to use mix_all::broker_vip_channel for consistent address handling.
Apply this diff:
self.remoting_client
- .invoke_oneway(addr.to_string(), request, timeout_millis)
+ .invoke_oneway(
+ mix_all::broker_vip_channel(self.client_config.vip_channel_enabled, addr),
+ request,
+ timeout_millis,
+ )
.await?;
Ok(())📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| self.remoting_client | |
| .invoke_oneway(addr.to_string(), request, timeout_millis) | |
| .await; | |
| self.remoting_client | |
| .invoke_oneway( | |
| mix_all::broker_vip_channel(self.client_config.vip_channel_enabled, addr), | |
| request, | |
| timeout_millis, | |
| ) | |
| .await; |
|
🚀Thanks for your contribution🎉. CodeRabbit(AI) will review your code first🔥 |
Which Issue(s) This PR Fixes(Closes)
Fixes #1004
Brief Description
How Did You Test This Change?
Summary by CodeRabbit
Release Notes
New Features
ProcessQueuestruct with methods for better tracking of unlock attempts and timestamps.RebalanceLocaltrait with several new asynchronous methods for improved message queue management.Bug Fixes
Documentation
These updates aim to enhance the overall user experience by improving the efficiency and reliability of message queue operations.