Conversation
|
🚀Thanks for your contribution🎉. CodeRabbit(AI) will review your code first🔥 |
WalkthroughThis update introduces a new feature to the RocketMQ Broker, primarily focused on enhancing the handling of pull message results. Key changes include the addition of new imports, updates to the Changes
Sequence Diagram(s)sequenceDiagram
participant BrokerRuntime
participant PullRequestHoldService
participant DefaultMessageStore
participant Listener
BrokerRuntime->>PullRequestHoldService: Initialize with BrokerConfig
PullRequestHoldService->>DefaultMessageStore: Set message_arriving_listener
BrokerRuntime->>DefaultMessageStore: Notify message arrival
DefaultMessageStore->>Listener: Message arrival notification
Assessment against linked issues
Poem
Tip Early access features: enabledWe are currently testing the following features in early access:
Note:
Thank you for using CodeRabbit. We offer it for free to the OSS community and would appreciate your support in helping us grow. If you find it useful, would you consider giving us a shout-out on your favorite social media? TipsChatThere are 3 ways to chat with CodeRabbit:
Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments. CodeRabbit Commands (invoked as PR comments)
Additionally, you can add CodeRabbit Configration File (
|
There was a problem hiding this comment.
Actionable comments posted: 1
Review details
Configuration used: .coderabbit.yaml
Review profile: CHILL
Files selected for processing (12)
- rocketmq-broker/src/broker_runtime.rs (9 hunks)
- rocketmq-broker/src/coldctr/cold_data_pull_request_hold_service.rs (1 hunks)
- rocketmq-broker/src/long_polling.rs (1 hunks)
- rocketmq-broker/src/long_polling/long_polling_service/pull_request_hold_service.rs (8 hunks)
- rocketmq-broker/src/long_polling/notify_message_arriving_listener.rs (1 hunks)
- rocketmq-broker/src/long_polling/pull_request.rs (1 hunks)
- rocketmq-broker/src/processor/pull_message_processor.rs (6 hunks)
- rocketmq-broker/src/util/hook_utils.rs (1 hunks)
- rocketmq-remoting/src/runtime/server.rs (1 hunks)
- rocketmq-store/src/base.rs (1 hunks)
- rocketmq-store/src/log_file.rs (2 hunks)
- rocketmq-store/src/message_store/default_message_store.rs (7 hunks)
Files skipped from review due to trivial changes (1)
- rocketmq-broker/src/util/hook_utils.rs
Additional comments not posted (29)
rocketmq-broker/src/coldctr/cold_data_pull_request_hold_service.rs (1)
17-17: Constant Definition:NO_SUSPEND_KEYThe constant
NO_SUSPEND_KEYis appropriately defined and named. Ensure it is used consistently throughout the codebase.rocketmq-broker/src/long_polling.rs (1)
Line range hint
8-8: New Module:notify_message_arriving_listenerThe new module
notify_message_arriving_listeneris correctly added. Ensure that all references to the oldmessage_arriving_listenermodule are updated to use this new module.rocketmq-store/src/base.rs (1)
27-27: New Module:message_arriving_listenerThe new module
message_arriving_listeneris correctly added. Ensure that it is used appropriately for handling message arrival events.rocketmq-broker/src/long_polling/notify_message_arriving_listener.rs (4)
19-19: Import Path UpdateThe import path for
MessageArrivingListenerhas been updated torocketmq_store::base::message_arriving_listener. Ensure that all references are updated accordingly.
21-21: New Struct:NotifyMessageArrivingListenerThe
NotifyMessageArrivingListenerstruct is correctly defined with thepull_request_hold_servicefield. Ensure that the field is correctly initialized and used.
Line range hint
23-29: Method:newThe
newmethod correctly initializes theNotifyMessageArrivingListenerstruct with thepull_request_hold_servicefield.
Line range hint
31-47: Implementation ofMessageArrivingListenerThe implementation of the
MessageArrivingListenertrait forNotifyMessageArrivingListeneris correct. Thearrivingmethod correctly delegates to thenotify_message_arriving_extmethod ofpull_request_hold_service.rocketmq-broker/src/long_polling/pull_request.rs (1)
92-94: LGTM!The new method
connection_handler_contextcorrectly returns a reference to thectxfield.rocketmq-store/src/log_file.rs (1)
112-112: LGTM!The new method
notify_message_arrive_if_necessaryis correctly added to theRocketMQMessageStoretrait.rocketmq-broker/src/long_polling/long_polling_service/pull_request_hold_service.rs (5)
Line range hint
40-56: LGTM!The
PullRequestHoldService::newmethod correctly initializes thebroker_configfield.
66-89: LGTM!The
PullRequestHoldService::startmethod correctly implements the polling logic based on thebroker_config.
108-108: LGTM!The
check_hold_requestmethod correctly callsnotify_message_arriving.
162-162: LGTM!The
notify_message_arrivingmethod correctly callsexecute_request_when_wakeupwith the new context parameter.
201-201: LGTM!The
notify_master_onlinemethod correctly callsexecute_request_when_wakeupwith the new context parameter.rocketmq-broker/src/processor/pull_message_processor.rs (4)
53-53: New Import:NO_SUSPEND_KEY.The new import
NO_SUSPEND_KEYappears to be used in the updated methods.
75-75: Refactor: UseArcformessage_store.The
message_storefield is now wrapped inArc, improving thread safety and shared ownership.
324-324: Refactor: Asynchronousprocess_requestmethod.The
process_requestmethod has been updated to be asynchronous and now accepts additional parameters includingctx.
788-818: New Method: Asynchronousexecute_request_when_wakeup.The
execute_request_when_wakeupmethod now includes asynchronous processing and usesNO_SUSPEND_KEY.rocketmq-broker/src/broker_runtime.rs (6)
57-58: New Imports:PullRequestHoldServiceandNotifyMessageArrivingListener.The new imports are used for handling pull requests and notifying message arrivals.
108-109: New Field:pull_request_hold_service.The
BrokerRuntimestruct now includes an optionalpull_request_hold_servicefield, which is initialized and used within the struct's implementation.
141-141: Clone Implementation:pull_request_hold_service.The
pull_request_hold_servicefield is correctly cloned in theCloneimplementation ofBrokerRuntime.
215-215: Initialization:pull_request_hold_service.The
pull_request_hold_servicefield is correctly initialized toNonein thenewmethod.
Line range hint
364-407: Initialization:pull_request_hold_serviceandmessage_arriving_listener.The
init_processormethod correctly initializespull_request_hold_serviceand setsmessage_arriving_listener.
598-600: Startpull_request_hold_service.The
pull_request_hold_serviceis correctly started if it is initialized.rocketmq-store/src/message_store/default_message_store.rs (5)
59-59: Import Addition Approved.The import of
MessageArrivingListeneris necessary for the new functionality.
111-112: Field Addition Approved.The addition of the
message_arriving_listenerfield and its inclusion in theCloneimplementation andnewmethod is necessary to support the new functionality for handling pull message results.Also applies to: 139-139, 216-216
454-461: Method Addition Approved.The
set_message_arriving_listenermethod allows setting the listener for message arrivals, which is necessary for the new functionality.
975-989: Method Addition Approved.The
notify_message_arrive_if_necessarymethod handles the message arrival notifications, which is necessary for the new functionality.
1015-1017: Method Addition Approved.The
notify_message_arrive4multi_queuemethod is necessary for the new functionality.
| pub async fn write(&mut self, cmd: RemotingCommand) { | ||
| match self.connection.framed.send(cmd).await { | ||
| Ok(_) => {} | ||
| Err(error) => { | ||
| error!("send response failed: {}", error); | ||
| } | ||
| } | ||
| } |
There was a problem hiding this comment.
Ensure proper error handling in write method.
The write method logs an error if sending the response fails but does not handle the error beyond logging. Consider adding error handling logic to ensure robustness.
pub async fn write(&mut self, cmd: RemotingCommand) -> Result<(), Box<dyn std::error::Error>> {
match self.connection.framed.send(cmd).await {
Ok(_) => Ok(()),
Err(error) => {
error!("send response failed: {}", error);
Err(Box::new(error))
}
}
}
Codecov ReportAttention: Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## main #728 +/- ##
==========================================
- Coverage 27.15% 26.86% -0.29%
==========================================
Files 263 264 +1
Lines 20755 20844 +89
==========================================
- Hits 5636 5600 -36
- Misses 15119 15244 +125 ☔ View full report in Codecov by Sentry. |
Which Issue(s) This PR Fixes(Closes)
Fixes #722
Brief Description
How Did You Test This Change?
Summary by CodeRabbit
New Features
pull_request_hold_serviceto enhance message processing hold capabilities.NO_SUSPEND_KEYconstant for improved message handling control.writemethod inConnectionHandlerContextWrapperfor asynchronous command handling.message_arriving_listenertoDefaultMessageStorefor enhanced message arrival notifications.Improvements
execute_request_when_wakeupfunction for asynchronous processing.