Skip to content

Commit 4e333ce

Browse files
authored
[ISSUE #1275]⚡️Optimize name server DefaultRequestProcessor#get_broker_member_group (#1276)
1 parent 6c37e1e commit 4e333ce

9 files changed

Lines changed: 46 additions & 50 deletions

File tree

Cargo.lock

Lines changed: 1 addition & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

rocketmq-broker/src/out_api/broker_outer_api.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -167,7 +167,7 @@ impl BrokerOuterAPI {
167167
if !name_server_address_list.is_empty() {
168168
let mut request_header = RegisterBrokerRequestHeader {
169169
broker_addr,
170-
broker_id: broker_id as i64,
170+
broker_id,
171171
broker_name,
172172
cluster_name,
173173
ha_server_addr,

rocketmq-client/src/factory/mq_client_instance.rs

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -104,7 +104,7 @@ pub struct MQClientInstance {
104104
rebalance_service: RebalanceService,
105105
pub(crate) default_producer: ArcMut<DefaultMQProducer>,
106106
instance_runtime: Arc<RocketMQRuntime>,
107-
broker_addr_table: Arc<RwLock<HashMap<CheetahString, HashMap<i64, CheetahString>>>>,
107+
broker_addr_table: Arc<RwLock<HashMap<CheetahString, HashMap<u64, CheetahString>>>>,
108108
broker_version_table: Arc<
109109
RwLock<
110110
HashMap<
@@ -752,7 +752,7 @@ impl MQClientInstance {
752752
let guard = self.broker_addr_table.read().await;
753753
let map = guard.get(broker_name);
754754
if let Some(map) = map {
755-
return map.get(&(mix_all::MASTER_ID as i64)).cloned();
755+
return map.get(&(mix_all::MASTER_ID)).cloned();
756756
}
757757
None
758758
}
@@ -784,7 +784,7 @@ impl MQClientInstance {
784784
if addr.is_empty() {
785785
continue;
786786
}
787-
if consumer_empty && *id != mix_all::MASTER_ID as i64 {
787+
if consumer_empty && *id != mix_all::MASTER_ID {
788788
continue;
789789
}
790790
self.send_heartbeat_to_broker_inner(*id, broker_name, addr, &heartbeat_data)
@@ -797,7 +797,7 @@ impl MQClientInstance {
797797

798798
pub async fn send_heartbeat_to_broker(
799799
&self,
800-
id: i64,
800+
id: u64,
801801
broker_name: &CheetahString,
802802
addr: &CheetahString,
803803
) -> bool {
@@ -826,7 +826,7 @@ impl MQClientInstance {
826826

827827
async fn send_heartbeat_to_broker_inner(
828828
&self,
829-
id: i64,
829+
id: u64,
830830
broker_name: &CheetahString,
831831
addr: &CheetahString,
832832
heartbeat_data: &HeartbeatData,
@@ -1047,17 +1047,17 @@ impl MQClientInstance {
10471047
let mut found = false;
10481048

10491049
if let Some(map) = map {
1050-
broker_addr = map.get(&(broker_id as i64));
1050+
broker_addr = map.get(&broker_id);
10511051
slave = broker_id != mix_all::MASTER_ID;
10521052
found = broker_addr.is_some();
10531053
if !found && slave {
1054-
broker_addr = map.get(&((broker_id + 1) as i64));
1054+
broker_addr = map.get(&(broker_id + 1));
10551055
found = broker_addr.is_some();
10561056
}
10571057
if !found && !only_this_broker {
10581058
if let Some((key, value)) = map.iter().next() {
10591059
//broker_addr = Some(value.clone());
1060-
slave = *key != mix_all::MASTER_ID as i64;
1060+
slave = *key != mix_all::MASTER_ID;
10611061
found = !value.is_empty();
10621062
}
10631063
}
@@ -1217,7 +1217,7 @@ pub fn topic_route_data2topic_publish_info(
12171217
.as_ref()
12181218
.unwrap()
12191219
.broker_addrs()
1220-
.contains_key(&(mix_all::MASTER_ID as i64))
1220+
.contains_key(&(mix_all::MASTER_ID))
12211221
{
12221222
continue;
12231223
}

rocketmq-namesrv/src/route/route_info_manager.rs

Lines changed: 15 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -113,7 +113,7 @@ impl RouteInfoManager {
113113
cluster_name: CheetahString,
114114
broker_addr: CheetahString,
115115
broker_name: CheetahString,
116-
broker_id: i64,
116+
broker_id: u64,
117117
ha_server_addr: CheetahString,
118118
zone_name: Option<CheetahString>,
119119
_timeout_millis: Option<i64>,
@@ -156,7 +156,7 @@ impl RouteInfoManager {
156156
.mut_from_ref()
157157
.get_mut(&broker_name)
158158
.unwrap();
159-
let mut prev_min_broker_id = 0i64;
159+
let mut prev_min_broker_id = 0;
160160
if !broker_data.broker_addrs().is_empty() {
161161
prev_min_broker_id = broker_data.broker_addrs().keys().min().copied().unwrap();
162162
}
@@ -222,7 +222,7 @@ impl RouteInfoManager {
222222
.insert(broker_id, broker_addr.clone());
223223

224224
register_first |= old_addr.is_none();
225-
let is_master = mix_all::MASTER_ID == broker_id as u64;
225+
let is_master = mix_all::MASTER_ID == broker_id;
226226

227227
let is_prime_slave = enable_acting_master.is_some()
228228
&& !is_master
@@ -334,8 +334,8 @@ impl RouteInfoManager {
334334
.insert(broker_addr_info.clone(), filter_server_list);
335335
}
336336

337-
if mix_all::MASTER_ID != broker_id as u64 {
338-
let master_address = broker_data.broker_addrs().get(&(mix_all::MASTER_ID as i64));
337+
if mix_all::MASTER_ID != broker_id {
338+
let master_address = broker_data.broker_addrs().get(&(mix_all::MASTER_ID));
339339
if let Some(master_addr) = master_address {
340340
let master_livie_info = self
341341
.broker_live_table
@@ -440,7 +440,7 @@ impl RouteInfoManager {
440440
!broker_data.broker_addrs().is_empty()
441441
&& !broker_data
442442
.broker_addrs()
443-
.contains_key(&(mix_all::MASTER_ID as i64))
443+
.contains_key(&(mix_all::MASTER_ID))
444444
});
445445

446446
if !need_acting_master {
@@ -451,7 +451,7 @@ impl RouteInfoManager {
451451
if broker_data.broker_addrs().is_empty()
452452
|| broker_data
453453
.broker_addrs()
454-
.contains_key(&(mix_all::MASTER_ID as i64))
454+
.contains_key(&(mix_all::MASTER_ID))
455455
|| !broker_data.enable_acting_master()
456456
{
457457
continue;
@@ -469,7 +469,7 @@ impl RouteInfoManager {
469469
{
470470
broker_data
471471
.broker_addrs_mut()
472-
.insert(mix_all::MASTER_ID as i64, acting_master_addr);
472+
.insert(mix_all::MASTER_ID, acting_master_addr);
473473
}
474474
}
475475
}
@@ -593,7 +593,7 @@ impl RouteInfoManager {
593593

594594
fn notify_min_broker_id_changed(
595595
&self,
596-
broker_addr_map: &HashMap<i64, CheetahString>,
596+
broker_addr_map: &HashMap<u64, CheetahString>,
597597
offline_broker_addr: Option<CheetahString>,
598598
ha_broker_addr: Option<CheetahString>,
599599
) {
@@ -635,7 +635,7 @@ impl RouteInfoManager {
635635

636636
fn choose_broker_addrs_to_notify(
637637
&self,
638-
broker_addr_map: &HashMap<i64, CheetahString>,
638+
broker_addr_map: &HashMap<u64, CheetahString>,
639639
offline_broker_addr: Option<CheetahString>,
640640
) -> Option<Vec<CheetahString>> {
641641
if broker_addr_map.len() == 1 || offline_broker_addr.is_some() {
@@ -667,12 +667,11 @@ impl RouteInfoManager {
667667
broker_name: &CheetahString,
668668
) -> Option<BrokerMemberGroup> {
669669
let mut group_member = BrokerMemberGroup::new(cluster_name.clone(), broker_name.clone());
670+
let lock_ = self.lock.read();
670671
if let Some(broker_data) = self.broker_addr_table.get(broker_name) {
671-
let map = broker_data.broker_addrs().clone();
672-
for (key, value) in map {
673-
group_member.broker_addrs.insert(key as u64, value);
674-
}
672+
group_member.broker_addrs = broker_data.broker_addrs().clone();
675673
}
674+
drop(lock_);
676675
Some(group_member)
677676
}
678677

@@ -913,7 +912,7 @@ impl RouteInfoManager {
913912
if &broker_addr_info.broker_addr == ip {
914913
un_register_request.broker_name =
915914
CheetahString::from_string(broker_data.broker_name().to_string());
916-
un_register_request.broker_id = *broker_id as u64;
915+
un_register_request.broker_id = *broker_id;
917916
return true;
918917
}
919918
}
@@ -949,7 +948,7 @@ impl RouteInfoManager {
949948

950949
if let Some(broker_data) = self.broker_addr_table.get_mut(broker_name.as_str()) {
951950
if !broker_data.broker_addrs().is_empty()
952-
&& un_register_request.broker_id as i64
951+
&& un_register_request.broker_id
953952
== broker_data
954953
.broker_addrs()
955954
.iter()

rocketmq-namesrv/src/route_info/broker_addr_info.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -48,14 +48,14 @@ impl Display for BrokerAddrInfo {
4848

4949
#[derive(Clone, Debug)]
5050
pub(crate) struct BrokerStatusChangeInfo {
51-
pub(crate) broker_addrs: HashMap<i64, CheetahString>,
51+
pub(crate) broker_addrs: HashMap<u64, CheetahString>,
5252
pub(crate) offline_broker_addr: CheetahString,
5353
pub(crate) ha_broker_addr: CheetahString,
5454
}
5555

5656
impl BrokerStatusChangeInfo {
5757
fn new(
58-
broker_addrs: HashMap<i64, CheetahString>,
58+
broker_addrs: HashMap<u64, CheetahString>,
5959
offline_broker_addr: CheetahString,
6060
ha_broker_addr: CheetahString,
6161
) -> Self {
@@ -66,11 +66,11 @@ impl BrokerStatusChangeInfo {
6666
}
6767
}
6868

69-
fn get_broker_addrs(&self) -> &HashMap<i64, CheetahString> {
69+
fn get_broker_addrs(&self) -> &HashMap<u64, CheetahString> {
7070
&self.broker_addrs
7171
}
7272

73-
fn set_broker_addrs(&mut self, broker_addrs: HashMap<i64, CheetahString>) {
73+
fn set_broker_addrs(&mut self, broker_addrs: HashMap<u64, CheetahString>) {
7474
self.broker_addrs = broker_addrs;
7575
}
7676

rocketmq-remoting/src/protocol/header/namesrv/brokerid_change_request_header.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ use crate::protocol::command_custom_header::FromMap;
1111
#[derive(Debug, Clone, Serialize, Deserialize, Default)]
1212
pub struct NotifyMinBrokerIdChangeRequestHeader {
1313
#[serde(rename = "minBrokerId")]
14-
pub min_broker_id: Option<i64>,
14+
pub min_broker_id: Option<u64>,
1515

1616
#[serde(rename = "brokerName")]
1717
pub broker_name: Option<CheetahString>,
@@ -34,7 +34,7 @@ impl NotifyMinBrokerIdChangeRequestHeader {
3434
const OFFLINE_BROKER_ADDR: &'static str = "offlineBrokerAddr";
3535

3636
pub fn new(
37-
min_broker_id: Option<i64>,
37+
min_broker_id: Option<u64>,
3838
broker_name: Option<CheetahString>,
3939
min_broker_addr: Option<CheetahString>,
4040
offline_broker_addr: Option<CheetahString>,
@@ -59,7 +59,7 @@ impl FromMap for NotifyMinBrokerIdChangeRequestHeader {
5959
.get(&CheetahString::from_static_str(
6060
NotifyMinBrokerIdChangeRequestHeader::MIN_BROKER_ID,
6161
))
62-
.and_then(|s| s.parse::<i64>().ok()),
62+
.and_then(|s| s.parse::<u64>().ok()),
6363
broker_name: map
6464
.get(&CheetahString::from_static_str(
6565
NotifyMinBrokerIdChangeRequestHeader::BROKER_NAME,

rocketmq-remoting/src/protocol/header/namesrv/register_broker_header.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,7 @@ pub struct RegisterBrokerRequestHeader {
4646

4747
/// The unique identifier for the broker.
4848
#[serde(rename = "brokerId")]
49-
pub broker_id: i64,
49+
pub broker_id: u64,
5050

5151
/// The optional heartbeat timeout in milliseconds.
5252
#[serde(rename = "heartbeatTimeoutMillis")]
@@ -98,7 +98,7 @@ impl RegisterBrokerRequestHeader {
9898
broker_addr: CheetahString,
9999
cluster_name: CheetahString,
100100
ha_server_addr: CheetahString,
101-
broker_id: i64,
101+
broker_id: u64,
102102
heartbeat_timeout_millis: Option<i64>,
103103
enable_acting_master: Option<bool>,
104104
compressed: bool,
@@ -151,7 +151,7 @@ impl FromMap for RegisterBrokerRequestHeader {
151151
.get(&CheetahString::from_static_str(
152152
RegisterBrokerRequestHeader::BROKER_ID,
153153
))
154-
.and_then(|s| s.parse::<i64>().ok())
154+
.and_then(|s| s.parse::<u64>().ok())
155155
.unwrap_or(0),
156156
heartbeat_timeout_millis: map
157157
.get(&CheetahString::from_static_str(

rocketmq-remoting/src/protocol/route/route_data_view.rs

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ pub struct BrokerData {
3030
#[serde(rename = "brokerName")]
3131
broker_name: CheetahString,
3232
#[serde(rename = "brokerAddrs")]
33-
broker_addrs: HashMap<i64 /* broker id */, CheetahString /* broker ip */>,
33+
broker_addrs: HashMap<u64 /* broker id */, CheetahString /* broker ip */>,
3434
#[serde(rename = "zoneName")]
3535
zone_name: Option<CheetahString>,
3636
#[serde(rename = "enableActingMaster")]
@@ -53,7 +53,7 @@ impl BrokerData {
5353
pub fn new(
5454
cluster: CheetahString,
5555
broker_name: CheetahString,
56-
broker_addrs: HashMap<i64, CheetahString>,
56+
broker_addrs: HashMap<u64, CheetahString>,
5757
zone_name: Option<CheetahString>,
5858
) -> BrokerData {
5959
BrokerData {
@@ -73,7 +73,7 @@ impl BrokerData {
7373
self.broker_name = broker_name;
7474
}
7575

76-
pub fn set_broker_addrs(&mut self, broker_addrs: HashMap<i64, CheetahString>) {
76+
pub fn set_broker_addrs(&mut self, broker_addrs: HashMap<u64, CheetahString>) {
7777
self.broker_addrs = broker_addrs;
7878
}
7979

@@ -94,15 +94,15 @@ impl BrokerData {
9494
&self.broker_name
9595
}
9696

97-
pub fn broker_addrs(&self) -> &HashMap<i64, CheetahString> {
97+
pub fn broker_addrs(&self) -> &HashMap<u64, CheetahString> {
9898
&self.broker_addrs
9999
}
100100

101-
pub fn broker_addrs_mut(&mut self) -> &mut HashMap<i64, CheetahString> {
101+
pub fn broker_addrs_mut(&mut self) -> &mut HashMap<u64, CheetahString> {
102102
&mut self.broker_addrs
103103
}
104104

105-
pub fn remove_broker_by_addr(&mut self, broker_id: i64, broker_addr: &str) {
105+
pub fn remove_broker_by_addr(&mut self, broker_id: u64, broker_addr: &str) {
106106
self.broker_addrs
107107
.retain(|key, value| value != broker_addr || *key == broker_id);
108108
}
@@ -116,7 +116,7 @@ impl BrokerData {
116116
}
117117

118118
pub fn select_broker_addr(&self) -> Option<CheetahString> {
119-
let master_address = self.broker_addrs.get(&(mix_all::MASTER_ID as i64)).cloned();
119+
let master_address = self.broker_addrs.get(&(mix_all::MASTER_ID)).cloned();
120120
if master_address.is_none() {
121121
return self
122122
.broker_addrs

rocketmq-remoting/src/rpc/client_metadata.rs

Lines changed: 3 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@ pub struct ClientMetadata {
4343
RwLock<
4444
HashMap<
4545
CheetahString, /* Broker Name */
46-
HashMap<i64 /* brokerId */, CheetahString /* address */>,
46+
HashMap<u64 /* brokerId */, CheetahString /* address */>,
4747
>,
4848
>,
4949
>,
@@ -137,10 +137,7 @@ impl ClientMetadata {
137137
if !read_guard.contains_key(broker_name) {
138138
return None;
139139
}
140-
let broker_addr = read_guard
141-
.get(broker_name)
142-
.unwrap()
143-
.get(&(MASTER_ID as i64));
140+
let broker_addr = read_guard.get(broker_name).unwrap().get(&(MASTER_ID));
144141
broker_addr.cloned()
145142
}
146143

@@ -233,7 +230,7 @@ impl ClientMetadata {
233230

234231
pub fn broker_addr_table(
235232
&self,
236-
) -> Arc<RwLock<HashMap<CheetahString, HashMap<i64, CheetahString>>>> {
233+
) -> Arc<RwLock<HashMap<CheetahString, HashMap<u64, CheetahString>>>> {
237234
self.broker_addr_table.clone()
238235
}
239236
}

0 commit comments

Comments
 (0)