Conversation
|
🚀Thanks for your contribution🎉. CodeRabbit(AI) will review your code first🔥 |
WalkthroughThe overall changes implement new functionalities and improvements across the RocketMQ project. Key additions include creating a Changes
Sequence Diagram(s)sequenceDiagram
participant Client
participant RpcClientImpl
participant RocketmqDefaultClient
Client->>RpcClientImpl: Request RPC operation
RpcClientImpl->>RocketmqDefaultClient: Invoke remote method
RocketmqDefaultClient-->>RpcClientImpl: Return response
RpcClientImpl-->>Client: Return result
Assessment against linked issues
Poem
Thank you for using CodeRabbit. We offer it for free to the OSS community and would appreciate your support in helping us grow. If you find it useful, would you consider giving us a shout-out on your favorite social media? TipsChatThere are 3 ways to chat with CodeRabbit:
Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments. CodeRabbit Commands (invoked as PR comments)
Additionally, you can add CodeRabbit Configration File (
|
There was a problem hiding this comment.
Actionable comments posted: 8
Review details
Configuration used: .coderabbit.yaml
Review profile: CHILL
Files ignored due to path filters (1)
Cargo.lockis excluded by!**/*.lock
Files selected for processing (19)
- rocketmq-common/src/common/message/message_queue.rs (1 hunks)
- rocketmq-remoting/Cargo.toml (1 hunks)
- rocketmq-remoting/src/protocol/body/broker_body/cluster_info.rs (1 hunks)
- rocketmq-remoting/src/protocol/header.rs (1 hunks)
- rocketmq-remoting/src/protocol/header/get_earliest_msg_storetime_response_header.rs (1 hunks)
- rocketmq-remoting/src/protocol/header/get_max_offset_response_header.rs (1 hunks)
- rocketmq-remoting/src/protocol/header/get_min_offset_response_header.rs (1 hunks)
- rocketmq-remoting/src/protocol/header/search_offset_response_header.rs (1 hunks)
- rocketmq-remoting/src/protocol/route/route_data_view.rs (4 hunks)
- rocketmq-remoting/src/protocol/static_topic/topic_queue_info.rs (3 hunks)
- rocketmq-remoting/src/protocol/static_topic/topic_queue_mapping_utils.rs (2 hunks)
- rocketmq-remoting/src/rpc.rs (1 hunks)
- rocketmq-remoting/src/rpc/client_metadata.rs (1 hunks)
- rocketmq-remoting/src/rpc/rpc_client.rs (1 hunks)
- rocketmq-remoting/src/rpc/rpc_client_hook.rs (1 hunks)
- rocketmq-remoting/src/rpc/rpc_client_impl.rs (1 hunks)
- rocketmq-remoting/src/rpc/rpc_client_utils.rs (1 hunks)
- rocketmq-remoting/src/rpc/rpc_request.rs (1 hunks)
- rocketmq-remoting/src/rpc/rpc_response.rs (2 hunks)
Files skipped from review due to trivial changes (2)
- rocketmq-remoting/Cargo.toml
- rocketmq-remoting/src/rpc.rs
Additional comments not posted (33)
rocketmq-remoting/src/protocol/header/get_max_offset_response_header.rs (1)
1-24: LGTM!The
GetMaxOffsetResponseHeaderstruct is well-defined, and the necessary traits are derived correctly. The file complies with the Apache License.rocketmq-remoting/src/protocol/header/get_min_offset_response_header.rs (1)
1-24: LGTM!The
GetMinOffsetResponseHeaderstruct is well-defined, and the necessary traits are derived correctly. The file complies with the Apache License.rocketmq-remoting/src/protocol/header/search_offset_response_header.rs (1)
1-24: LGTM!The
SearchOffsetResponseHeaderstruct is well-defined, and the necessary traits are derived correctly. The file complies with the Apache License.rocketmq-remoting/src/protocol/header/get_earliest_msg_storetime_response_header.rs (1)
1-24: LGTM!The
GetEarliestMsgStoretimeResponseHeaderstruct is well-defined, and the necessary traits are derived correctly. The file complies with the Apache License.rocketmq-remoting/src/rpc/rpc_request.rs (1)
25-25: LGTM! Ensuring thread-safety with theSendtrait.The addition of the
Sendtrait to thebodyfield and its constructor is crucial for safe multi-threaded operations.Also applies to: 29-31
rocketmq-remoting/src/rpc/rpc_client_hook.rs (1)
21-28: LGTM! Useful hooks for RPC operations.The
before_requestandafter_responsemethods provide useful hooks for custom logic before and after RPC calls, enhancing flexibility.rocketmq-remoting/src/protocol/header.rs (1)
21-23: LGTM! Enhancing protocol capabilities with new response headers.The addition of new response header modules enhances the protocol's capability to manage diverse responses effectively.
Also applies to: 30-30
rocketmq-remoting/src/rpc/rpc_client.rs (1)
23-37: LGTM! Enhancing RPC handling with new invocation methods.The
invokeandinvoke_mqmethods provide robust mechanisms for handling RPC calls, enhancing the client's capability to interact with remote services.rocketmq-remoting/src/protocol/body/broker_body/cluster_info.rs (2)
29-29: Verify the necessity of makingbroker_addr_tablepublic.Making the field public increases accessibility but might expose internal state unnecessarily. Ensure this change is intentional and doesn't violate encapsulation principles.
32-32: Verify the necessity of makingcluster_addr_tablepublic.Making the field public increases accessibility but might expose internal state unnecessarily. Ensure this change is intentional and doesn't violate encapsulation principles.
rocketmq-remoting/src/protocol/static_topic/topic_queue_info.rs (1)
27-27: LGTM! ChangingscopetoOption<String>enhances flexibility.The change allows for cases where the scope might be unavailable. The default value ensures proper initialization.
rocketmq-remoting/src/rpc/rpc_client_utils.rs (1)
32-36: LGTM! Conditional body setting increate_command_for_rpc_requestenhances robustness.The change ensures that the body is only set if present, preventing potential errors when the body is
None.rocketmq-common/src/common/message/message_queue.rs (3)
24-25: LGTM! Adding traits toMessageQueueenhances flexibility.The additional traits (
Clone,Hash,Ord,PartialOrd, etc.) increase the flexibility and usability of theMessageQueuestruct.
31-78: LGTM! New methods inMessageQueueprovide a comprehensive API.The methods (
new,from_other,from_parts,get_topic,set_topic,get_broker_name,set_broker_name,get_queue_id,set_queue_id) offer a complete set of functionalities for creating and managingMessageQueueinstances.
80-92: LGTM!fmt::DisplayandDefaultimplementations enhance usability.The
fmt::DisplayandDefaultimplementations make it easier to debug and useMessageQueueinstances.rocketmq-remoting/src/rpc/rpc_response.rs (3)
60-68: LGTM! Thenew_exceptionmethod is correctly implemented.The method initializes an
RpcResponsewith an optionalRpcException, setting the code based on the exception if it exists.
70-81: LGTM! Thenewmethod is correctly implemented.The method initializes an
RpcResponsewith a code, header, and optional body.
83-90: LGTM! Thenew_optionmethod is correctly implemented.The method initializes an
RpcResponsewith a code and optional body.rocketmq-remoting/src/protocol/static_topic/topic_queue_mapping_utils.rs (1)
83-95: LGTM! Theget_mock_broker_namefunction is correctly implemented.The function generates a mock broker name based on the provided scope, handling different cases appropriately.
rocketmq-remoting/src/protocol/route/route_data_view.rs (3)
Line range hint
25-107:
LGTM! The enhancements to theBrokerDatastruct are correctly implemented.The additional traits and methods provide useful functionalities for managing broker data.
Line range hint
108-158:
LGTM! The enhancements to theQueueDatastruct are correctly implemented.The additional traits and methods provide useful functionalities for managing queue data.
Line range hint
159-207:
LGTM! The enhancements to theTopicRouteDatastruct are correctly implemented.The additional traits and methods provide useful functionalities for managing topic route data.
rocketmq-remoting/src/rpc/client_metadata.rs (7)
1-55: LGTM! TheClientMetadatastruct and its initialization method are correctly implemented.The struct definition and the
newmethod correctly initialize theClientMetadatawith default values.
57-87: LGTM! Thefresh_topic_routemethod is correctly implemented.The method refreshes the topic route data efficiently, updating the broker address table and topic endpoints table.
89-98: LGTM! Theget_broker_name_from_message_queuemethod is correctly implemented.The method retrieves the broker name from a message queue efficiently.
100-114: LGTM! Therefresh_cluster_infomethod is correctly implemented.The method refreshes the cluster information efficiently, updating the broker address table.
116-126: LGTM! Thefind_master_broker_addrmethod is correctly implemented.The method finds the master broker address efficiently.
128-212: LGTM! Thetopic_route_data2endpoints_for_static_topicmethod is correctly implemented.The method converts topic route data to endpoints for static topics efficiently.
214-216: LGTM! Thebroker_addr_tablemethod is correctly implemented.The method retrieves the broker address table efficiently.
rocketmq-remoting/src/rpc/rpc_client_impl.rs (4)
48-54: LGTM!The
newmethod correctly initializes theRpcClientImplstruct.
56-61: LGTM!The
register_client_hookmethod correctly adds a client hook to the list.
63-65: LGTM!The
clear_client_hookmethod correctly clears the client hook list.
67-77: LGTM!The
get_broker_addr_by_name_or_exceptionmethod correctly retrieves the broker address and handles the error case.
| async fn handle_pull_message( | ||
| &self, | ||
| addr: String, | ||
| request: RpcRequest, | ||
| timeout_millis: u64, | ||
| ) -> Result<RpcResponse, RpcException> { | ||
| let request_command = RpcClientUtils::create_command_for_rpc_request(request); | ||
| match self | ||
| .remoting_client | ||
| .invoke_async(addr.clone(), request_command, timeout_millis) | ||
| .await | ||
| { | ||
| Ok(response) => match ResponseCode::from(response.code()) { | ||
| ResponseCode::Success | ||
| | ResponseCode::PullNotFound | ||
| | ResponseCode::PullRetryImmediately | ||
| | ResponseCode::PullOffsetMoved => { | ||
| let response_header = response | ||
| .decode_command_custom_header::<PullMessageResponseHeader>() | ||
| .unwrap(); | ||
| let body = response | ||
| .body() | ||
| .as_ref() | ||
| .map(|value| Box::new(value.clone()) as Box<dyn Any>); | ||
| let rpc_response = RpcResponse::new(response.code(), response_header, body); | ||
| Ok(rpc_response) | ||
| } | ||
| _ => Ok(RpcResponse::new_exception(Some(RpcException( | ||
| response.code(), | ||
| "unexpected remote response code".to_string(), | ||
| )))), | ||
| }, | ||
| Err(_error) => Err(RpcException( | ||
| From::from(ResponseCode::SystemError), | ||
| format!("process failed. addr: {}. Request", addr), | ||
| )), | ||
| } |
There was a problem hiding this comment.
Refactor error handling for better readability.
The nested match statements can be simplified for better readability.
async fn handle_pull_message(
&self,
addr: String,
request: RpcRequest,
timeout_millis: u64,
) -> Result<RpcResponse, RpcException> {
let request_command = RpcClientUtils::create_command_for_rpc_request(request);
let response = self
.remoting_client
.invoke_async(addr.clone(), request_command, timeout_millis)
.await;
match response {
Ok(response) => {
if matches!(
ResponseCode::from(response.code()),
ResponseCode::Success
| ResponseCode::PullNotFound
| ResponseCode::PullRetryImmediately
| ResponseCode::PullOffsetMoved
) {
let response_header = response
.decode_command_custom_header::<PullMessageResponseHeader>()
.unwrap();
let body = response
.body()
.as_ref()
.map(|value| Box::new(value.clone()) as Box<dyn Any>);
let rpc_response = RpcResponse::new(response.code(), response_header, body);
Ok(rpc_response)
} else {
Ok(RpcResponse::new_exception(Some(RpcException(
response.code(),
"unexpected remote response code".to_string(),
))))
}
}
Err(_error) => Err(RpcException(
From::from(ResponseCode::SystemError),
format!("process failed. addr: {}. Request", addr),
)),
}
}| async fn handle_get_min_offset( | ||
| &self, | ||
| addr: String, | ||
| request: RpcRequest, | ||
| timeout_millis: u64, | ||
| ) -> Result<RpcResponse, RpcException> { | ||
| let request_command = RpcClientUtils::create_command_for_rpc_request(request); | ||
| match self | ||
| .remoting_client | ||
| .invoke_async(addr.clone(), request_command, timeout_millis) | ||
| .await | ||
| { | ||
| Ok(response) => match ResponseCode::from(response.code()) { | ||
| ResponseCode::Success => { | ||
| let response_header = response | ||
| .decode_command_custom_header::<GetMinOffsetResponseHeader>() | ||
| .unwrap(); | ||
| let body = response | ||
| .body() | ||
| .as_ref() | ||
| .map(|value| Box::new(value.clone()) as Box<dyn Any>); | ||
| let rpc_response = RpcResponse::new(response.code(), response_header, body); | ||
| Ok(rpc_response) | ||
| } | ||
| _ => Ok(RpcResponse::new_exception(Some(RpcException( | ||
| response.code(), | ||
| "unknown remote error".to_string(), | ||
| )))), | ||
| }, | ||
| Err(_error) => Err(RpcException( | ||
| From::from(ResponseCode::SystemError), | ||
| format!("process failed. addr: {}. Request", addr), | ||
| )), | ||
| } |
There was a problem hiding this comment.
Refactor error handling for better readability.
The nested match statements can be simplified for better readability.
async fn handle_get_min_offset(
&self,
addr: String,
request: RpcRequest,
timeout_millis: u64,
) -> Result<RpcResponse, RpcException> {
let request_command = RpcClientUtils::create_command_for_rpc_request(request);
let response = self
.remoting_client
.invoke_async(addr.clone(), request_command, timeout_millis)
.await;
match response {
Ok(response) => {
if ResponseCode::from(response.code()) == ResponseCode::Success {
let response_header = response
.decode_command_custom_header::<GetMinOffsetResponseHeader>()
.unwrap();
let body = response
.body()
.as_ref()
.map(|value| Box::new(value.clone()) as Box<dyn Any>);
let rpc_response = RpcResponse::new(response.code(), response_header, body);
Ok(rpc_response)
} else {
Ok(RpcResponse::new_exception(Some(RpcException(
response.code(),
"unknown remote error".to_string(),
))))
}
}
Err(_error) => Err(RpcException(
From::from(ResponseCode::SystemError),
format!("process failed. addr: {}. Request", addr),
)),
}
}| async fn handle_get_max_offset( | ||
| &self, | ||
| addr: String, | ||
| request: RpcRequest, | ||
| timeout_millis: u64, | ||
| ) -> Result<RpcResponse, RpcException> { | ||
| let request_command = RpcClientUtils::create_command_for_rpc_request(request); | ||
| match self | ||
| .remoting_client | ||
| .invoke_async(addr.clone(), request_command, timeout_millis) | ||
| .await | ||
| { | ||
| Ok(response) => match ResponseCode::from(response.code()) { | ||
| ResponseCode::Success => { | ||
| let response_header = response | ||
| .decode_command_custom_header::<GetMaxOffsetResponseHeader>() | ||
| .unwrap(); | ||
| let body = response | ||
| .body() | ||
| .as_ref() | ||
| .map(|value| Box::new(value.clone()) as Box<dyn Any>); | ||
| let rpc_response = RpcResponse::new(response.code(), response_header, body); | ||
| Ok(rpc_response) | ||
| } | ||
| _ => Ok(RpcResponse::new_exception(Some(RpcException( | ||
| response.code(), | ||
| "unknown remote error".to_string(), | ||
| )))), | ||
| }, | ||
| Err(_error) => Err(RpcException( | ||
| From::from(ResponseCode::SystemError), | ||
| format!("process failed. addr: {}. Request", addr), | ||
| )), | ||
| } |
There was a problem hiding this comment.
Refactor error handling for better readability.
The nested match statements can be simplified for better readability.
async fn handle_get_max_offset(
&self,
addr: String,
request: RpcRequest,
timeout_millis: u64,
) -> Result<RpcResponse, RpcException> {
let request_command = RpcClientUtils::create_command_for_rpc_request(request);
let response = self
.remoting_client
.invoke_async(addr.clone(), request_command, timeout_millis)
.await;
match response {
Ok(response) => {
if ResponseCode::from(response.code()) == ResponseCode::Success {
let response_header = response
.decode_command_custom_header::<GetMaxOffsetResponseHeader>()
.unwrap();
let body = response
.body()
.as_ref()
.map(|value| Box::new(value.clone()) as Box<dyn Any>);
let rpc_response = RpcResponse::new(response.code(), response_header, body);
Ok(rpc_response)
} else {
Ok(RpcResponse::new_exception(Some(RpcException(
response.code(),
"unknown remote error".to_string(),
))))
}
}
Err(_error) => Err(RpcException(
From::from(ResponseCode::SystemError),
format!("process failed. addr: {}. Request", addr),
)),
}
}| async fn handle_search_offset( | ||
| &self, | ||
| addr: String, | ||
| request: RpcRequest, | ||
| timeout_millis: u64, | ||
| ) -> Result<RpcResponse, RpcException> { | ||
| let request_command = RpcClientUtils::create_command_for_rpc_request(request); | ||
| match self | ||
| .remoting_client | ||
| .invoke_async(addr.clone(), request_command, timeout_millis) | ||
| .await | ||
| { | ||
| Ok(response) => match ResponseCode::from(response.code()) { | ||
| ResponseCode::Success => { | ||
| let response_header = response | ||
| .decode_command_custom_header::<SearchOffsetResponseHeader>() | ||
| .unwrap(); | ||
| let body = response | ||
| .body() | ||
| .as_ref() | ||
| .map(|value| Box::new(value.clone()) as Box<dyn Any>); | ||
| let rpc_response = RpcResponse::new(response.code(), response_header, body); | ||
| Ok(rpc_response) | ||
| } | ||
| _ => Ok(RpcResponse::new_exception(Some(RpcException( | ||
| response.code(), | ||
| "unknown remote error".to_string(), | ||
| )))), | ||
| }, | ||
| Err(_error) => Err(RpcException( | ||
| From::from(ResponseCode::SystemError), | ||
| format!("process failed. addr: {}. Request", addr), | ||
| )), | ||
| } |
There was a problem hiding this comment.
Refactor error handling for better readability.
The nested match statements can be simplified for better readability.
async fn handle_search_offset(
&self,
addr: String,
request: RpcRequest,
timeout_millis: u64,
) -> Result<RpcResponse, RpcException> {
let request_command = RpcClientUtils::create_command_for_rpc_request(request);
let response = self
.remoting_client
.invoke_async(addr.clone(), request_command, timeout_millis)
.await;
match response {
Ok(response) => {
if ResponseCode::from(response.code()) == ResponseCode::Success {
let response_header = response
.decode_command_custom_header::<SearchOffsetResponseHeader>()
.unwrap();
let body = response
.body()
.as_ref()
.map(|value| Box::new(value.clone()) as Box<dyn Any>);
let rpc_response = RpcResponse::new(response.code(), response_header, body);
Ok(rpc_response)
} else {
Ok(RpcResponse::new_exception(Some(RpcException(
response.code(),
"unknown remote error".to_string(),
))))
}
}
Err(_error) => Err(RpcException(
From::from(ResponseCode::SystemError),
format!("process failed. addr: {}. Request", addr),
)),
}
}| async fn handle_get_earliest_msg_storetime( | ||
| &self, | ||
| addr: String, | ||
| request: RpcRequest, | ||
| timeout_millis: u64, | ||
| ) -> Result<RpcResponse, RpcException> { | ||
| let request_command = RpcClientUtils::create_command_for_rpc_request(request); | ||
| match self | ||
| .remoting_client | ||
| .invoke_async(addr.clone(), request_command, timeout_millis) | ||
| .await | ||
| { | ||
| Ok(response) => match ResponseCode::from(response.code()) { | ||
| ResponseCode::Success => { | ||
| let response_header = response | ||
| .decode_command_custom_header::<GetEarliestMsgStoretimeResponseHeader>() | ||
| .unwrap(); | ||
| let body = response | ||
| .body() | ||
| .as_ref() | ||
| .map(|value| Box::new(value.clone()) as Box<dyn Any>); | ||
| let rpc_response = RpcResponse::new(response.code(), response_header, body); | ||
| Ok(rpc_response) | ||
| } | ||
| _ => Ok(RpcResponse::new_exception(Some(RpcException( | ||
| response.code(), | ||
| "unknown remote error".to_string(), | ||
| )))), | ||
| }, | ||
| Err(_error) => Err(RpcException( | ||
| From::from(ResponseCode::SystemError), | ||
| format!("process failed. addr: {}. Request", addr), | ||
| )), | ||
| } |
There was a problem hiding this comment.
Refactor error handling for better readability.
The nested match statements can be simplified for better readability.
async fn handle_get_earliest_msg_storetime(
&self,
addr: String,
request: RpcRequest,
timeout_millis: u64,
) -> Result<RpcResponse, RpcException> {
let request_command = RpcClientUtils::create_command_for_rpc_request(request);
let response = self
.remoting_client
.invoke_async(addr.clone(), request_command, timeout_millis)
.await;
match response {
Ok(response) => {
if ResponseCode::from(response.code()) == ResponseCode::Success {
let response_header = response
.decode_command_custom_header::<GetEarliestMsgStoretimeResponseHeader>()
.unwrap();
let body = response
.body()
.as_ref()
.map(|value| Box::new(value.clone()) as Box<dyn Any>);
let rpc_response = RpcResponse::new(response.code(), response_header, body);
Ok(rpc_response)
} else {
Ok(RpcResponse::new_exception(Some(RpcException(
response.code(),
"unknown remote error".to_string(),
))))
}
}
Err(_error) => Err(RpcException(
From::from(ResponseCode::SystemError),
format!("process failed. addr: {}. Request", addr),
)),
}
}| async fn handle_query_consumer_offset( | ||
| &self, | ||
| addr: String, | ||
| request: RpcRequest, | ||
| timeout_millis: u64, | ||
| ) -> Result<RpcResponse, RpcException> { | ||
| let request_command = RpcClientUtils::create_command_for_rpc_request(request); | ||
| match self | ||
| .remoting_client | ||
| .invoke_async(addr.clone(), request_command, timeout_millis) | ||
| .await | ||
| { | ||
| Ok(response) => match ResponseCode::from(response.code()) { | ||
| ResponseCode::Success => { | ||
| let response_header = response | ||
| .decode_command_custom_header::<QueryConsumerOffsetResponseHeader>() | ||
| .unwrap(); | ||
| let body = response | ||
| .body() | ||
| .as_ref() | ||
| .map(|value| Box::new(value.clone()) as Box<dyn Any>); | ||
| let rpc_response = RpcResponse::new(response.code(), response_header, body); | ||
| Ok(rpc_response) | ||
| } | ||
| ResponseCode::QueryNotFound => { | ||
| let rpc_response = RpcResponse::new_option(response.code(), None); | ||
| Ok(rpc_response) | ||
| } | ||
| _ => Ok(RpcResponse::new_exception(Some(RpcException( | ||
| response.code(), | ||
| "unknown remote error".to_string(), | ||
| )))), | ||
| }, | ||
| Err(_error) => Err(RpcException( | ||
| From::from(ResponseCode::SystemError), | ||
| format!("process failed. addr: {}. Request", addr), | ||
| )), | ||
| } |
There was a problem hiding this comment.
Refactor error handling for better readability.
The nested match statements can be simplified for better readability.
async fn handle_query_consumer_offset(
&self,
addr: String,
request: RpcRequest,
timeout_millis: u64,
) -> Result<RpcResponse, RpcException> {
let request_command = RpcClientUtils::create_command_for_rpc_request(request);
let response = self
.remoting_client
.invoke_async(addr.clone(), request_command, timeout_millis)
.await;
match response {
Ok(response) => {
match ResponseCode::from(response.code()) {
ResponseCode::Success => {
let response_header = response
.decode_command_custom_header::<QueryConsumerOffsetResponseHeader>()
.unwrap();
let body = response
.body()
.as_ref()
.map(|value| Box::new(value.clone()) as Box<dyn Any>);
let rpc_response = RpcResponse::new(response.code(), response_header, body);
Ok(rpc_response)
}
ResponseCode::QueryNotFound => {
let rpc_response = RpcResponse::new_option(response.code(), None);
Ok(rpc_response)
}
_ => Ok(RpcResponse::new_exception(Some(RpcException(
response.code(),
"unknown remote error".to_string(),
)))),
}
}
Err(_error) => Err(RpcException(
From::from(ResponseCode::SystemError),
format!("process failed. addr: {}. Request", addr),
)),
}
}| async fn handle_update_consumer_offset( | ||
| &self, | ||
| addr: String, | ||
| request: RpcRequest, | ||
| timeout_millis: u64, | ||
| ) -> Result<RpcResponse, RpcException> { | ||
| let request_command = RpcClientUtils::create_command_for_rpc_request(request); | ||
| match self | ||
| .remoting_client | ||
| .invoke_async(addr.clone(), request_command, timeout_millis) | ||
| .await | ||
| { | ||
| Ok(response) => match ResponseCode::from(response.code()) { | ||
| ResponseCode::Success => { | ||
| let response_header = response | ||
| .decode_command_custom_header::<UpdateConsumerOffsetResponseHeader>() | ||
| .unwrap(); | ||
| let body = response | ||
| .body() | ||
| .as_ref() | ||
| .map(|value| Box::new(value.clone()) as Box<dyn Any>); | ||
| let rpc_response = RpcResponse::new(response.code(), response_header, body); | ||
| Ok(rpc_response) | ||
| } | ||
| _ => Ok(RpcResponse::new_exception(Some(RpcException( | ||
| response.code(), | ||
| "unknown remote error".to_string(), | ||
| )))), | ||
| }, | ||
| Err(_error) => Err(RpcException( | ||
| From::from(ResponseCode::SystemError), | ||
| format!("process failed. addr: {}. Request", addr), | ||
| )), | ||
| } |
There was a problem hiding this comment.
Refactor error handling for better readability.
The nested match statements can be simplified for better readability.
async fn handle_update_consumer_offset(
&self,
addr: String,
request: RpcRequest,
timeout_millis: u64,
) -> Result<RpcResponse, RpcException> {
let request_command = RpcClientUtils::create_command_for_rpc_request(request);
let response = self
.remoting_client
.invoke_async(addr.clone(), request_command, timeout_millis)
.await;
match response {
Ok(response) => {
if ResponseCode::from(response.code()) == ResponseCode::Success {
let response_header = response
.decode_command_custom_header::<UpdateConsumerOffsetResponseHeader>()
.unwrap();
let body = response
.body()
.as_ref()
.map(|value| Box::new(value.clone()) as Box<dyn Any>);
let rpc_response = RpcResponse::new(response.code(), response_header, body);
Ok(rpc_response)
} else {
Ok(RpcResponse::new_exception(Some(RpcException(
response.code(),
"unknown remote error".to_string(),
))))
}
}
Err(_error) => Err(RpcException(
From::from(ResponseCode::SystemError),
format!("process failed. addr: {}. Request", addr),
)),
}
}| async fn handle_common_body_request( | ||
| &self, | ||
| addr: String, | ||
| request: RpcRequest, | ||
| timeout_millis: u64, | ||
| ) -> Result<RpcResponse, RpcException> { | ||
| let request_command = RpcClientUtils::create_command_for_rpc_request(request); | ||
| match self | ||
| .remoting_client | ||
| .invoke_async(addr.clone(), request_command, timeout_millis) | ||
| .await | ||
| { | ||
| Ok(response) => match ResponseCode::from(response.code()) { | ||
| ResponseCode::Success => { | ||
| let body = response | ||
| .body() | ||
| .as_ref() | ||
| .map(|value| Box::new(value.clone()) as Box<dyn Any>); | ||
| let rpc_response = RpcResponse::new_option(response.code(), body); | ||
| Ok(rpc_response) | ||
| } | ||
| _ => Ok(RpcResponse::new_exception(Some(RpcException( | ||
| response.code(), | ||
| "unknown remote error".to_string(), | ||
| )))), | ||
| }, | ||
| Err(_error) => Err(RpcException( | ||
| From::from(ResponseCode::SystemError), | ||
| format!("process failed. addr: {}. Request", addr), | ||
| )), | ||
| } |
There was a problem hiding this comment.
Refactor error handling for better readability.
The nested match statements can be simplified for better readability.
async fn handle_common_body_request(
&self,
addr: String,
request: RpcRequest,
timeout_millis: u64,
) -> Result<RpcResponse, RpcException> {
let request_command = RpcClientUtils::create_command_for_rpc_request(request);
let response = self
.remoting_client
.invoke_async(addr.clone(), request_command, timeout_millis)
.await;
match response {
Ok(response) => {
if ResponseCode::from(response.code()) == ResponseCode::Success {
let body = response
<!-- This is an auto-generated comment by CodeRabbit -->
Which Issue(s) This PR Fixes(Closes)
Fixes #750
Brief Description
How Did You Test This Change?
Summary by CodeRabbit
New Features
Enhancements
MessageQueuestruct with additional traits and methods for better manipulation and initialization.RpcRequestandRpcResponsestructs with enhanced constructors and field types.Dependency Updates
logdependency andbytesas adev-dependencyinrocketmq-remoting.Bug Fixes
cluster_info.rsto improve access control.These changes enhance the functionality, flexibility, and usability of the RocketMQ client, providing more robust management of topic routes and RPC operations.