Skip to content

Commit bc75797

Browse files
committed
[ISSUE #1269]⚡️Optimize name server DefaultRequestProcessor#process_register_broker
1 parent 8ebea39 commit bc75797

3 files changed

Lines changed: 59 additions & 51 deletions

File tree

rocketmq-namesrv/src/processor/default_request_processor.rs

Lines changed: 1 addition & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -225,12 +225,7 @@ impl DefaultRequestProcessor {
225225
topic_config_wrapper = register_broker_body
226226
.topic_config_serialize_wrapper()
227227
.clone();
228-
register_broker_body
229-
.filter_server_list()
230-
.iter()
231-
.for_each(|s| {
232-
filter_server_list.push(s.clone());
233-
});
228+
filter_server_list = register_broker_body.filter_server_list().clone();
234229
} else {
235230
topic_config_wrapper = extract_register_topic_config_from_request(&request);
236231
}

rocketmq-namesrv/src/route/route_info_manager.rs

Lines changed: 57 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@
1818
use std::collections::HashMap;
1919
use std::collections::HashSet;
2020
use std::net::SocketAddr;
21-
use std::time::SystemTime;
21+
use std::sync::Arc;
2222

2323
use cheetah_string::CheetahString;
2424
use rocketmq_common::common::config::TopicConfig;
@@ -28,6 +28,7 @@ use rocketmq_common::common::namesrv::namesrv_config::NamesrvConfig;
2828
use rocketmq_common::common::topic::TopicValidator;
2929
use rocketmq_common::common::TopicSysFlag;
3030
use rocketmq_common::TimeUtils;
31+
use rocketmq_common::TimeUtils::get_current_millis;
3132
use rocketmq_remoting::clients::rocketmq_default_impl::RocketmqDefaultClient;
3233
use rocketmq_remoting::clients::RemotingClient;
3334
use rocketmq_remoting::code::request_code::RequestCode;
@@ -82,6 +83,7 @@ pub struct RouteInfoManager {
8283
pub(crate) topic_queue_mapping_info_table: TopicQueueMappingInfoTable,
8384
pub(crate) namesrv_config: ArcMut<NamesrvConfig>,
8485
pub(crate) remoting_client: ArcMut<RocketmqDefaultClient>,
86+
lock: Arc<parking_lot::RwLock<()>>,
8587
}
8688

8789
#[allow(private_interfaces)]
@@ -99,14 +101,15 @@ impl RouteInfoManager {
99101
topic_queue_mapping_info_table: ArcMut::new(HashMap::new()),
100102
namesrv_config,
101103
remoting_client,
104+
lock: Arc::new(Default::default()),
102105
}
103106
}
104107
}
105108

106109
//impl register broker
107110
impl RouteInfoManager {
108111
pub fn register_broker(
109-
&mut self,
112+
&self,
110113
cluster_name: CheetahString,
111114
broker_addr: CheetahString,
112115
broker_name: CheetahString,
@@ -120,35 +123,39 @@ impl RouteInfoManager {
120123
remote_addr: SocketAddr,
121124
) -> Option<RegisterBrokerResult> {
122125
let mut result = RegisterBrokerResult::default();
126+
let _write = self.lock.write();
123127
//init or update cluster information
124-
if !self.cluster_addr_table.contains_key(&cluster_name) {
125-
self.cluster_addr_table
126-
.insert(cluster_name.clone(), HashSet::new());
127-
}
128128
self.cluster_addr_table
129-
.get_mut(&cluster_name)
130-
.unwrap()
129+
.mut_from_ref()
130+
.entry(cluster_name.clone())
131+
.or_default()
131132
.insert(broker_name.clone());
132133

133134
let enable_acting_master_inner = enable_acting_master.unwrap_or_default();
134-
let mut register_first =
135-
if let Some(broker_data) = self.broker_addr_table.get_mut(&broker_name) {
136-
broker_data.set_enable_acting_master(enable_acting_master_inner);
137-
broker_data.set_zone_name(zone_name.clone());
138-
false
139-
} else {
140-
let mut broker_data = BrokerData::new(
141-
cluster_name.clone(),
142-
broker_name.clone(),
143-
HashMap::new(),
144-
zone_name,
145-
);
146-
broker_data.set_enable_acting_master(enable_acting_master_inner);
147-
self.broker_addr_table
148-
.insert(broker_name.clone(), broker_data);
149-
true
150-
};
151-
let broker_data = self.broker_addr_table.get_mut(&broker_name).unwrap();
135+
let mut register_first = if let Some(broker_data) =
136+
self.broker_addr_table.mut_from_ref().get_mut(&broker_name)
137+
{
138+
broker_data.set_enable_acting_master(enable_acting_master_inner);
139+
broker_data.set_zone_name(zone_name.clone());
140+
false
141+
} else {
142+
let mut broker_data = BrokerData::new(
143+
cluster_name.clone(),
144+
broker_name.clone(),
145+
HashMap::new(),
146+
zone_name,
147+
);
148+
broker_data.set_enable_acting_master(enable_acting_master_inner);
149+
self.broker_addr_table
150+
.mut_from_ref()
151+
.insert(broker_name.clone(), broker_data);
152+
true
153+
};
154+
let broker_data = self
155+
.broker_addr_table
156+
.mut_from_ref()
157+
.get_mut(&broker_name)
158+
.unwrap();
152159
let mut prev_min_broker_id = 0i64;
153160
if !broker_data.broker_addrs().is_empty() {
154161
prev_min_broker_id = broker_data.broker_addrs().keys().min().copied().unwrap();
@@ -186,7 +193,7 @@ impl RouteInfoManager {
186193
&broker_addr,
187194
new_state_version
188195
);
189-
self.broker_live_table.remove(
196+
self.broker_live_table.mut_from_ref().remove(
190197
BrokerAddrInfo::new(cluster_name.clone(), broker_addr.clone()).as_ref(),
191198
);
192199
return Some(result);
@@ -244,7 +251,10 @@ impl RouteInfoManager {
244251
.map(|item| item.to_string())
245252
.collect::<HashSet<String>>();
246253
for to_delete_topic in to_delete_topics {
247-
let queue_data_map = self.topic_queue_table.get_mut(to_delete_topic.as_str());
254+
let queue_data_map = self
255+
.topic_queue_table
256+
.mut_from_ref()
257+
.get_mut(to_delete_topic.as_str());
248258
if let Some(queue_data) = queue_data_map {
249259
let removed_qd = queue_data.remove(&broker_name);
250260
if let Some(ref removed_qd_inner) = removed_qd {
@@ -254,7 +264,9 @@ impl RouteInfoManager {
254264
);
255265
}
256266
if queue_data.is_empty() {
257-
self.topic_queue_table.remove(to_delete_topic.as_str());
267+
self.topic_queue_table
268+
.mut_from_ref()
269+
.remove(to_delete_topic.as_str());
258270
}
259271
}
260272
}
@@ -285,9 +297,11 @@ impl RouteInfoManager {
285297
for (topic, vtq_info) in topic_queue_mapping_info_map {
286298
if !self.topic_queue_mapping_info_table.contains_key(topic) {
287299
self.topic_queue_mapping_info_table
300+
.mut_from_ref()
288301
.insert(topic.clone(), HashMap::new());
289302
}
290303
self.topic_queue_mapping_info_table
304+
.mut_from_ref()
291305
.get_mut(topic)
292306
.unwrap()
293307
.insert(vtq_info.bname.as_ref().unwrap().clone(), vtq_info.clone());
@@ -297,13 +311,10 @@ impl RouteInfoManager {
297311

298312
let broker_addr_info = BrokerAddrInfo::new(cluster_name.clone(), broker_addr.clone());
299313

300-
self.broker_live_table.insert(
314+
self.broker_live_table.mut_from_ref().insert(
301315
broker_addr_info.clone(),
302316
BrokerLiveInfo::new(
303-
SystemTime::now()
304-
.duration_since(SystemTime::UNIX_EPOCH)
305-
.expect("Time went backwards")
306-
.as_millis() as i64,
317+
get_current_millis() as i64,
307318
DEFAULT_BROKER_CHANNEL_EXPIRED_TIME,
308319
topic_config_serialize_wrapper
309320
.topic_config_serialize_wrapper
@@ -314,9 +325,12 @@ impl RouteInfoManager {
314325
),
315326
);
316327
if filter_server_list.is_empty() {
317-
self.filter_server_table.remove(&broker_addr_info);
328+
self.filter_server_table
329+
.mut_from_ref()
330+
.remove(&broker_addr_info);
318331
} else {
319332
self.filter_server_table
333+
.mut_from_ref()
320334
.insert(broker_addr_info.clone(), filter_server_list);
321335
}
322336

@@ -345,6 +359,7 @@ impl RouteInfoManager {
345359
),
346360
)
347361
}
362+
drop(_write);
348363
Some(result)
349364
}
350365
}
@@ -471,7 +486,7 @@ impl RouteInfoManager {
471486
}
472487

473488
impl RouteInfoManager {
474-
fn topic_set_of_broker_name(&mut self, broker_name: &str) -> HashSet<String> {
489+
fn topic_set_of_broker_name(&self, broker_name: &str) -> HashSet<String> {
475490
let mut topic_of_broker = HashSet::new();
476491
for (key, value) in self.topic_queue_table.iter() {
477492
if value.contains_key(broker_name) {
@@ -482,7 +497,7 @@ impl RouteInfoManager {
482497
}
483498

484499
pub(crate) fn is_topic_config_changed(
485-
&mut self,
500+
&self,
486501
cluster_name: &CheetahString,
487502
broker_addr: &CheetahString,
488503
data_version: &DataVersion,
@@ -532,11 +547,7 @@ impl RouteInfoManager {
532547
None
533548
}
534549

535-
fn create_and_update_queue_data(
536-
&mut self,
537-
broker_name: &CheetahString,
538-
topic_config: TopicConfig,
539-
) {
550+
fn create_and_update_queue_data(&self, broker_name: &CheetahString, topic_config: TopicConfig) {
540551
let queue_data = QueueData::new(
541552
broker_name.clone(),
542553
topic_config.write_queue_nums,
@@ -547,6 +558,7 @@ impl RouteInfoManager {
547558

548559
let queue_data_map = self
549560
.topic_queue_table
561+
.mut_from_ref()
550562
.get_mut(topic_config.topic_name.as_ref().unwrap().as_str());
551563
if let Some(queue_data_map_inner) = queue_data_map {
552564
let existed_qd = queue_data_map_inner.get(broker_name);
@@ -572,15 +584,15 @@ impl RouteInfoManager {
572584
&queue_data
573585
);
574586
queue_data_map_inner.insert(broker_name.clone(), queue_data);
575-
self.topic_queue_table.insert(
587+
self.topic_queue_table.mut_from_ref().insert(
576588
topic_config.topic_name.as_ref().unwrap().clone(),
577589
queue_data_map_inner,
578590
);
579591
}
580592
}
581593

582594
fn notify_min_broker_id_changed(
583-
&mut self,
595+
&self,
584596
broker_addr_map: &HashMap<i64, CheetahString>,
585597
offline_broker_addr: Option<CheetahString>,
586598
ha_broker_addr: Option<CheetahString>,
@@ -622,7 +634,7 @@ impl RouteInfoManager {
622634
}
623635

624636
fn choose_broker_addrs_to_notify(
625-
&mut self,
637+
&self,
626638
broker_addr_map: &HashMap<i64, CheetahString>,
627639
offline_broker_addr: Option<CheetahString>,
628640
) -> Option<Vec<CheetahString>> {

rocketmq-remoting/src/protocol/route/route_data_view.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -81,6 +81,7 @@ impl BrokerData {
8181
self.zone_name = zone_name;
8282
}
8383

84+
#[inline]
8485
pub fn set_enable_acting_master(&mut self, enable_acting_master: bool) {
8586
self.enable_acting_master = enable_acting_master;
8687
}

0 commit comments

Comments
 (0)