@@ -38,8 +38,9 @@ use rocketmq_remoting::protocol::request_source::RequestSource;
3838use rocketmq_remoting:: protocol:: static_topic:: topic_queue_mapping_context:: TopicQueueMappingContext ;
3939use rocketmq_remoting:: protocol:: static_topic:: topic_queue_mapping_detail:: TopicQueueMappingDetail ;
4040use rocketmq_remoting:: protocol:: static_topic:: topic_queue_mapping_utils:: TopicQueueMappingUtils ;
41+ use rocketmq_remoting:: rpc:: rpc_client:: RpcClient ;
4142use rocketmq_remoting:: rpc:: rpc_client_utils:: RpcClientUtils ;
42- use rocketmq_remoting:: rpc:: rpc_response :: RpcResponse ;
43+ use rocketmq_remoting:: rpc:: rpc_request :: RpcRequest ;
4344use rocketmq_remoting:: runtime:: server:: ConnectionHandlerContext ;
4445use rocketmq_store:: base:: get_message_result:: GetMessageResult ;
4546use rocketmq_store:: base:: message_status_enum:: GetMessageStatus ;
@@ -59,6 +60,7 @@ use crate::filter::expression_message_filter::ExpressionMessageFilter;
5960use crate :: filter:: manager:: consumer_filter_manager:: ConsumerFilterManager ;
6061use crate :: offset:: manager:: broadcast_offset_manager:: BroadcastOffsetManager ;
6162use crate :: offset:: manager:: consumer_offset_manager:: ConsumerOffsetManager ;
63+ use crate :: out_api:: broker_outer_api:: BrokerOuterAPI ;
6264use crate :: processor:: pull_message_result_handler:: PullMessageResultHandler ;
6365use crate :: subscription:: manager:: subscription_group_manager:: SubscriptionGroupManager ;
6466use crate :: topic:: manager:: topic_config_manager:: TopicConfigManager ;
@@ -77,6 +79,7 @@ pub struct PullMessageProcessor<MS> {
7779 broadcast_offset_manager : Arc < BroadcastOffsetManager > ,
7880 message_store : Arc < MS > ,
7981 cold_data_cg_ctr_service : Arc < ColdDataCgCtrService > ,
82+ broker_outer_api : Arc < BrokerOuterAPI > ,
8083}
8184
8285impl < MS > PullMessageProcessor < MS > {
@@ -91,6 +94,7 @@ impl<MS> PullMessageProcessor<MS> {
9194 consumer_offset_manager : Arc < ConsumerOffsetManager > ,
9295 broadcast_offset_manager : Arc < BroadcastOffsetManager > ,
9396 message_store : Arc < MS > ,
97+ broker_outer_api : Arc < BrokerOuterAPI > ,
9498 ) -> Self {
9599 Self {
96100 pull_message_result_handler,
@@ -104,10 +108,12 @@ impl<MS> PullMessageProcessor<MS> {
104108 broadcast_offset_manager,
105109 message_store,
106110 cold_data_cg_ctr_service : Arc :: new ( Default :: default ( ) ) ,
111+ broker_outer_api,
107112 }
108113 }
109114
110- pub fn rewrite_request_for_static_topic (
115+ pub async fn rewrite_request_for_static_topic (
116+ & self ,
111117 request_header : & mut PullMessageRequestHeader ,
112118 mapping_context : & mut TopicQueueMappingContext ,
113119 ) -> Option < RemotingCommand > {
@@ -173,16 +179,31 @@ impl<MS> PullMessageProcessor<MS> {
173179 sys_flag = PullSysFlag :: clear_suspend_flag ( sys_flag as u32 ) as i32 ;
174180 sys_flag = PullSysFlag :: clear_commit_offset_flag ( sys_flag as u32 ) as i32 ;
175181 request_header. sys_flag = sys_flag;
176- /* let rpc_request = RpcRequest::new(RequestCode::PullMessage, request_header.clone(), None);
177- let rpc_response = broker_controller
182+ let rpc_request = RpcRequest :: new (
183+ RequestCode :: PullMessage . to_i32 ( ) ,
184+ request_header
185+ . topic_request
186+ . as_ref ( )
187+ . unwrap ( )
188+ . rpc
189+ . clone ( )
190+ . unwrap ( ) ,
191+ None ,
192+ ) ;
193+ let rpc_response = self
178194 . broker_outer_api
179- .rpc_client
180- .invoke(rpc_request, broker_controller.broker_config.forward_timeout)?;
181- if rpc_response.exception.is_some() {
182- return Err(rpc_response.exception.unwrap());
183- }*/
184-
185- let rpc_response = RpcResponse :: default ( ) ;
195+ . rpc_client ( )
196+ . invoke ( rpc_request, self . broker_config . forward_timeout )
197+ . await ;
198+ let rpc_response = match rpc_response {
199+ Ok ( value) => value,
200+ Err ( err) => {
201+ return Some ( RemotingCommand :: create_response_command_with_code_remark (
202+ ResponseCode :: SystemError ,
203+ format ! ( "invoke rpc failed: {:?}" , err) ,
204+ ) ) ;
205+ }
206+ } ;
186207 let response_header = rpc_response. get_header_mut :: < PullMessageResponseHeader > ( ) ;
187208 let rewrite_result = rewrite_response_for_static_topic (
188209 request_header,
@@ -443,10 +464,10 @@ where
443464 let mut topic_queue_mapping_context = self
444465 . topic_queue_mapping_manager
445466 . build_topic_queue_mapping_context ( & request_header, false ) ;
446- if let Some ( resp) = Self :: rewrite_request_for_static_topic (
447- & mut request_header,
448- & mut topic_queue_mapping_context ,
449- ) {
467+ if let Some ( resp) = self
468+ . rewrite_request_for_static_topic ( & mut request_header, & mut topic_queue_mapping_context )
469+ . await
470+ {
450471 return Some ( resp) ;
451472 }
452473 if request_header. queue_id . is_none ( )
0 commit comments