Skip to content

Commit e3ecf6c

Browse files
committed
[ISSUE #761]🔥Optimize Pull message rewrite static topic🚀
1 parent 6c205cb commit e3ecf6c

6 files changed

Lines changed: 57 additions & 18 deletions

File tree

rocketmq-broker/src/broker_runtime.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -395,6 +395,7 @@ impl BrokerRuntime {
395395
Arc::new(self.consumer_offset_manager.clone()),
396396
Arc::new(BroadcastOffsetManager::default()),
397397
message_store.clone(),
398+
self.broker_out_api.clone(),
398399
);
399400

400401
let consumer_manage_processor = ConsumerManageProcessor::new(

rocketmq-broker/src/out_api/broker_outer_api.rs

Lines changed: 16 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,8 @@ use rocketmq_remoting::protocol::route::route_data_view::QueueData;
3737
use rocketmq_remoting::protocol::route::route_data_view::TopicRouteData;
3838
use rocketmq_remoting::protocol::RemotingSerializable;
3939
use rocketmq_remoting::remoting::RemotingService;
40+
use rocketmq_remoting::rpc::client_metadata::ClientMetadata;
41+
use rocketmq_remoting::rpc::rpc_client_impl::RpcClientImpl;
4042
use rocketmq_remoting::runtime::config::client_config::TokioClientConfig;
4143
use rocketmq_remoting::runtime::RPCHook;
4244
use tracing::error;
@@ -46,14 +48,19 @@ use tracing::info;
4648
pub struct BrokerOuterAPI {
4749
remoting_client: RocketmqDefaultClient,
4850
name_server_address: Option<String>,
51+
rpc_client: RpcClientImpl,
52+
client_metadata: ClientMetadata,
4953
}
5054

5155
impl BrokerOuterAPI {
5256
pub fn new(tokio_client_config: Arc<TokioClientConfig>) -> Self {
5357
let client = RocketmqDefaultClient::new(tokio_client_config);
58+
let client_metadata = ClientMetadata::new();
5459
Self {
55-
remoting_client: client,
60+
remoting_client: client.clone(),
5661
name_server_address: None,
62+
rpc_client: RpcClientImpl::new(client_metadata.clone(), client),
63+
client_metadata,
5764
}
5865
}
5966

@@ -62,12 +69,15 @@ impl BrokerOuterAPI {
6269
rpc_hook: Option<impl RPCHook>,
6370
) -> Self {
6471
let mut client = RocketmqDefaultClient::new(tokio_client_config);
72+
let client_metadata = ClientMetadata::new();
6573
if let Some(rpc_hook) = rpc_hook {
6674
client.register_rpc_hook(rpc_hook);
6775
}
6876
Self {
69-
remoting_client: client,
77+
remoting_client: client.clone(),
7078
name_server_address: None,
79+
rpc_client: RpcClientImpl::new(client_metadata.clone(), client),
80+
client_metadata,
7181
}
7282
}
7383

@@ -256,6 +266,10 @@ impl BrokerOuterAPI {
256266
pub fn shutdown(&self) {}
257267

258268
pub fn refresh_metadata(&self) {}
269+
270+
pub fn rpc_client(&self) -> &RpcClientImpl {
271+
&self.rpc_client
272+
}
259273
}
260274

261275
fn dns_lookup_address_by_domain(domain: &str) -> Vec<String> {

rocketmq-broker/src/processor/pull_message_processor.rs

Lines changed: 36 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -38,8 +38,9 @@ use rocketmq_remoting::protocol::request_source::RequestSource;
3838
use rocketmq_remoting::protocol::static_topic::topic_queue_mapping_context::TopicQueueMappingContext;
3939
use rocketmq_remoting::protocol::static_topic::topic_queue_mapping_detail::TopicQueueMappingDetail;
4040
use rocketmq_remoting::protocol::static_topic::topic_queue_mapping_utils::TopicQueueMappingUtils;
41+
use rocketmq_remoting::rpc::rpc_client::RpcClient;
4142
use rocketmq_remoting::rpc::rpc_client_utils::RpcClientUtils;
42-
use rocketmq_remoting::rpc::rpc_response::RpcResponse;
43+
use rocketmq_remoting::rpc::rpc_request::RpcRequest;
4344
use rocketmq_remoting::runtime::server::ConnectionHandlerContext;
4445
use rocketmq_store::base::get_message_result::GetMessageResult;
4546
use rocketmq_store::base::message_status_enum::GetMessageStatus;
@@ -59,6 +60,7 @@ use crate::filter::expression_message_filter::ExpressionMessageFilter;
5960
use crate::filter::manager::consumer_filter_manager::ConsumerFilterManager;
6061
use crate::offset::manager::broadcast_offset_manager::BroadcastOffsetManager;
6162
use crate::offset::manager::consumer_offset_manager::ConsumerOffsetManager;
63+
use crate::out_api::broker_outer_api::BrokerOuterAPI;
6264
use crate::processor::pull_message_result_handler::PullMessageResultHandler;
6365
use crate::subscription::manager::subscription_group_manager::SubscriptionGroupManager;
6466
use crate::topic::manager::topic_config_manager::TopicConfigManager;
@@ -77,6 +79,7 @@ pub struct PullMessageProcessor<MS> {
7779
broadcast_offset_manager: Arc<BroadcastOffsetManager>,
7880
message_store: Arc<MS>,
7981
cold_data_cg_ctr_service: Arc<ColdDataCgCtrService>,
82+
broker_outer_api: Arc<BrokerOuterAPI>,
8083
}
8184

8285
impl<MS> PullMessageProcessor<MS> {
@@ -91,6 +94,7 @@ impl<MS> PullMessageProcessor<MS> {
9194
consumer_offset_manager: Arc<ConsumerOffsetManager>,
9295
broadcast_offset_manager: Arc<BroadcastOffsetManager>,
9396
message_store: Arc<MS>,
97+
broker_outer_api: Arc<BrokerOuterAPI>,
9498
) -> Self {
9599
Self {
96100
pull_message_result_handler,
@@ -104,10 +108,12 @@ impl<MS> PullMessageProcessor<MS> {
104108
broadcast_offset_manager,
105109
message_store,
106110
cold_data_cg_ctr_service: Arc::new(Default::default()),
111+
broker_outer_api,
107112
}
108113
}
109114

110-
pub fn rewrite_request_for_static_topic(
115+
pub async fn rewrite_request_for_static_topic(
116+
&self,
111117
request_header: &mut PullMessageRequestHeader,
112118
mapping_context: &mut TopicQueueMappingContext,
113119
) -> Option<RemotingCommand> {
@@ -173,16 +179,31 @@ impl<MS> PullMessageProcessor<MS> {
173179
sys_flag = PullSysFlag::clear_suspend_flag(sys_flag as u32) as i32;
174180
sys_flag = PullSysFlag::clear_commit_offset_flag(sys_flag as u32) as i32;
175181
request_header.sys_flag = sys_flag;
176-
/* let rpc_request = RpcRequest::new(RequestCode::PullMessage, request_header.clone(), None);
177-
let rpc_response = broker_controller
182+
let rpc_request = RpcRequest::new(
183+
RequestCode::PullMessage.to_i32(),
184+
request_header
185+
.topic_request
186+
.as_ref()
187+
.unwrap()
188+
.rpc
189+
.clone()
190+
.unwrap(),
191+
None,
192+
);
193+
let rpc_response = self
178194
.broker_outer_api
179-
.rpc_client
180-
.invoke(rpc_request, broker_controller.broker_config.forward_timeout)?;
181-
if rpc_response.exception.is_some() {
182-
return Err(rpc_response.exception.unwrap());
183-
}*/
184-
185-
let rpc_response = RpcResponse::default();
195+
.rpc_client()
196+
.invoke(rpc_request, self.broker_config.forward_timeout)
197+
.await;
198+
let rpc_response = match rpc_response {
199+
Ok(value) => value,
200+
Err(err) => {
201+
return Some(RemotingCommand::create_response_command_with_code_remark(
202+
ResponseCode::SystemError,
203+
format!("invoke rpc failed: {:?}", err),
204+
));
205+
}
206+
};
186207
let response_header = rpc_response.get_header_mut::<PullMessageResponseHeader>();
187208
let rewrite_result = rewrite_response_for_static_topic(
188209
request_header,
@@ -443,10 +464,10 @@ where
443464
let mut topic_queue_mapping_context = self
444465
.topic_queue_mapping_manager
445466
.build_topic_queue_mapping_context(&request_header, false);
446-
if let Some(resp) = Self::rewrite_request_for_static_topic(
447-
&mut request_header,
448-
&mut topic_queue_mapping_context,
449-
) {
467+
if let Some(resp) = self
468+
.rewrite_request_for_static_topic(&mut request_header, &mut topic_queue_mapping_context)
469+
.await
470+
{
450471
return Some(resp);
451472
}
452473
if request_header.queue_id.is_none()

rocketmq-common/src/common/broker/broker_config.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -160,6 +160,7 @@ pub struct BrokerConfig {
160160
pub max_error_rate_of_bloom_filter: i32,
161161
pub expect_consumer_num_use_filter: i32,
162162
pub bit_map_length_consume_queue_ext: i32,
163+
pub forward_timeout: u64,
163164
}
164165

165166
impl Default for BrokerConfig {
@@ -231,6 +232,7 @@ impl Default for BrokerConfig {
231232
max_error_rate_of_bloom_filter: 20,
232233
expect_consumer_num_use_filter: 32,
233234
bit_map_length_consume_queue_ext: 64,
235+
forward_timeout: 3 * 1000,
234236
}
235237
}
236238
}

rocketmq-remoting/src/rpc/client_metadata.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ use crate::protocol::route::route_data_view::TopicRouteData;
2727
use crate::protocol::static_topic::topic_queue_info::TopicQueueMappingInfo;
2828
use crate::protocol::static_topic::topic_queue_mapping_utils::TopicQueueMappingUtils;
2929

30-
#[derive(Default)]
30+
#[derive(Default, Clone)]
3131
pub struct ClientMetadata {
3232
topic_route_table: Arc<RwLock<HashMap<String /* Topic */, TopicRouteData>>>,
3333
topic_end_points_table:

rocketmq-remoting/src/rpc/rpc_client_impl.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@ use crate::rpc::rpc_client_utils::RpcClientUtils;
3838
use crate::rpc::rpc_request::RpcRequest;
3939
use crate::rpc::rpc_response::RpcResponse;
4040

41+
#[derive(Clone)]
4142
pub struct RpcClientImpl {
4243
client_metadata: ClientMetadata,
4344
remoting_client: RocketmqDefaultClient,

0 commit comments

Comments
 (0)