1818use std:: collections:: HashMap ;
1919use std:: collections:: HashSet ;
2020use std:: net:: SocketAddr ;
21- use std:: time :: SystemTime ;
21+ use std:: sync :: Arc ;
2222
2323use cheetah_string:: CheetahString ;
2424use rocketmq_common:: common:: config:: TopicConfig ;
@@ -28,6 +28,7 @@ use rocketmq_common::common::namesrv::namesrv_config::NamesrvConfig;
2828use rocketmq_common:: common:: topic:: TopicValidator ;
2929use rocketmq_common:: common:: TopicSysFlag ;
3030use rocketmq_common:: TimeUtils ;
31+ use rocketmq_common:: TimeUtils :: get_current_millis;
3132use rocketmq_remoting:: clients:: rocketmq_default_impl:: RocketmqDefaultClient ;
3233use rocketmq_remoting:: clients:: RemotingClient ;
3334use rocketmq_remoting:: code:: request_code:: RequestCode ;
@@ -82,6 +83,7 @@ pub struct RouteInfoManager {
8283 pub ( crate ) topic_queue_mapping_info_table : TopicQueueMappingInfoTable ,
8384 pub ( crate ) namesrv_config : ArcMut < NamesrvConfig > ,
8485 pub ( crate ) remoting_client : ArcMut < RocketmqDefaultClient > ,
86+ lock : Arc < parking_lot:: RwLock < ( ) > > ,
8587}
8688
8789#[ allow( private_interfaces) ]
@@ -99,14 +101,15 @@ impl RouteInfoManager {
99101 topic_queue_mapping_info_table : ArcMut :: new ( HashMap :: new ( ) ) ,
100102 namesrv_config,
101103 remoting_client,
104+ lock : Arc :: new ( Default :: default ( ) ) ,
102105 }
103106 }
104107}
105108
106109//impl register broker
107110impl RouteInfoManager {
108111 pub fn register_broker (
109- & mut self ,
112+ & self ,
110113 cluster_name : CheetahString ,
111114 broker_addr : CheetahString ,
112115 broker_name : CheetahString ,
@@ -120,35 +123,39 @@ impl RouteInfoManager {
120123 remote_addr : SocketAddr ,
121124 ) -> Option < RegisterBrokerResult > {
122125 let mut result = RegisterBrokerResult :: default ( ) ;
126+ let _write = self . lock . write ( ) ;
123127 //init or update cluster information
124- if !self . cluster_addr_table . contains_key ( & cluster_name) {
125- self . cluster_addr_table
126- . insert ( cluster_name. clone ( ) , HashSet :: new ( ) ) ;
127- }
128128 self . cluster_addr_table
129- . get_mut ( & cluster_name)
130- . unwrap ( )
129+ . mut_from_ref ( )
130+ . entry ( cluster_name. clone ( ) )
131+ . or_default ( )
131132 . insert ( broker_name. clone ( ) ) ;
132133
133134 let enable_acting_master_inner = enable_acting_master. unwrap_or_default ( ) ;
134- let mut register_first =
135- if let Some ( broker_data) = self . broker_addr_table . get_mut ( & broker_name) {
136- broker_data. set_enable_acting_master ( enable_acting_master_inner) ;
137- broker_data. set_zone_name ( zone_name. clone ( ) ) ;
138- false
139- } else {
140- let mut broker_data = BrokerData :: new (
141- cluster_name. clone ( ) ,
142- broker_name. clone ( ) ,
143- HashMap :: new ( ) ,
144- zone_name,
145- ) ;
146- broker_data. set_enable_acting_master ( enable_acting_master_inner) ;
147- self . broker_addr_table
148- . insert ( broker_name. clone ( ) , broker_data) ;
149- true
150- } ;
151- let broker_data = self . broker_addr_table . get_mut ( & broker_name) . unwrap ( ) ;
135+ let mut register_first = if let Some ( broker_data) =
136+ self . broker_addr_table . mut_from_ref ( ) . get_mut ( & broker_name)
137+ {
138+ broker_data. set_enable_acting_master ( enable_acting_master_inner) ;
139+ broker_data. set_zone_name ( zone_name. clone ( ) ) ;
140+ false
141+ } else {
142+ let mut broker_data = BrokerData :: new (
143+ cluster_name. clone ( ) ,
144+ broker_name. clone ( ) ,
145+ HashMap :: new ( ) ,
146+ zone_name,
147+ ) ;
148+ broker_data. set_enable_acting_master ( enable_acting_master_inner) ;
149+ self . broker_addr_table
150+ . mut_from_ref ( )
151+ . insert ( broker_name. clone ( ) , broker_data) ;
152+ true
153+ } ;
154+ let broker_data = self
155+ . broker_addr_table
156+ . mut_from_ref ( )
157+ . get_mut ( & broker_name)
158+ . unwrap ( ) ;
152159 let mut prev_min_broker_id = 0i64 ;
153160 if !broker_data. broker_addrs ( ) . is_empty ( ) {
154161 prev_min_broker_id = broker_data. broker_addrs ( ) . keys ( ) . min ( ) . copied ( ) . unwrap ( ) ;
@@ -186,7 +193,7 @@ impl RouteInfoManager {
186193 & broker_addr,
187194 new_state_version
188195 ) ;
189- self . broker_live_table . remove (
196+ self . broker_live_table . mut_from_ref ( ) . remove (
190197 BrokerAddrInfo :: new ( cluster_name. clone ( ) , broker_addr. clone ( ) ) . as_ref ( ) ,
191198 ) ;
192199 return Some ( result) ;
@@ -244,7 +251,10 @@ impl RouteInfoManager {
244251 . map ( |item| item. to_string ( ) )
245252 . collect :: < HashSet < String > > ( ) ;
246253 for to_delete_topic in to_delete_topics {
247- let queue_data_map = self . topic_queue_table . get_mut ( to_delete_topic. as_str ( ) ) ;
254+ let queue_data_map = self
255+ . topic_queue_table
256+ . mut_from_ref ( )
257+ . get_mut ( to_delete_topic. as_str ( ) ) ;
248258 if let Some ( queue_data) = queue_data_map {
249259 let removed_qd = queue_data. remove ( & broker_name) ;
250260 if let Some ( ref removed_qd_inner) = removed_qd {
@@ -254,7 +264,9 @@ impl RouteInfoManager {
254264 ) ;
255265 }
256266 if queue_data. is_empty ( ) {
257- self . topic_queue_table . remove ( to_delete_topic. as_str ( ) ) ;
267+ self . topic_queue_table
268+ . mut_from_ref ( )
269+ . remove ( to_delete_topic. as_str ( ) ) ;
258270 }
259271 }
260272 }
@@ -285,9 +297,11 @@ impl RouteInfoManager {
285297 for ( topic, vtq_info) in topic_queue_mapping_info_map {
286298 if !self . topic_queue_mapping_info_table . contains_key ( topic) {
287299 self . topic_queue_mapping_info_table
300+ . mut_from_ref ( )
288301 . insert ( topic. clone ( ) , HashMap :: new ( ) ) ;
289302 }
290303 self . topic_queue_mapping_info_table
304+ . mut_from_ref ( )
291305 . get_mut ( topic)
292306 . unwrap ( )
293307 . insert ( vtq_info. bname . as_ref ( ) . unwrap ( ) . clone ( ) , vtq_info. clone ( ) ) ;
@@ -297,13 +311,10 @@ impl RouteInfoManager {
297311
298312 let broker_addr_info = BrokerAddrInfo :: new ( cluster_name. clone ( ) , broker_addr. clone ( ) ) ;
299313
300- self . broker_live_table . insert (
314+ self . broker_live_table . mut_from_ref ( ) . insert (
301315 broker_addr_info. clone ( ) ,
302316 BrokerLiveInfo :: new (
303- SystemTime :: now ( )
304- . duration_since ( SystemTime :: UNIX_EPOCH )
305- . expect ( "Time went backwards" )
306- . as_millis ( ) as i64 ,
317+ get_current_millis ( ) as i64 ,
307318 DEFAULT_BROKER_CHANNEL_EXPIRED_TIME ,
308319 topic_config_serialize_wrapper
309320 . topic_config_serialize_wrapper
@@ -314,9 +325,12 @@ impl RouteInfoManager {
314325 ) ,
315326 ) ;
316327 if filter_server_list. is_empty ( ) {
317- self . filter_server_table . remove ( & broker_addr_info) ;
328+ self . filter_server_table
329+ . mut_from_ref ( )
330+ . remove ( & broker_addr_info) ;
318331 } else {
319332 self . filter_server_table
333+ . mut_from_ref ( )
320334 . insert ( broker_addr_info. clone ( ) , filter_server_list) ;
321335 }
322336
@@ -345,6 +359,7 @@ impl RouteInfoManager {
345359 ) ,
346360 )
347361 }
362+ drop ( _write) ;
348363 Some ( result)
349364 }
350365}
@@ -471,7 +486,7 @@ impl RouteInfoManager {
471486}
472487
473488impl RouteInfoManager {
474- fn topic_set_of_broker_name ( & mut self , broker_name : & str ) -> HashSet < String > {
489+ fn topic_set_of_broker_name ( & self , broker_name : & str ) -> HashSet < String > {
475490 let mut topic_of_broker = HashSet :: new ( ) ;
476491 for ( key, value) in self . topic_queue_table . iter ( ) {
477492 if value. contains_key ( broker_name) {
@@ -482,7 +497,7 @@ impl RouteInfoManager {
482497 }
483498
484499 pub ( crate ) fn is_topic_config_changed (
485- & mut self ,
500+ & self ,
486501 cluster_name : & CheetahString ,
487502 broker_addr : & CheetahString ,
488503 data_version : & DataVersion ,
@@ -532,11 +547,7 @@ impl RouteInfoManager {
532547 None
533548 }
534549
535- fn create_and_update_queue_data (
536- & mut self ,
537- broker_name : & CheetahString ,
538- topic_config : TopicConfig ,
539- ) {
550+ fn create_and_update_queue_data ( & self , broker_name : & CheetahString , topic_config : TopicConfig ) {
540551 let queue_data = QueueData :: new (
541552 broker_name. clone ( ) ,
542553 topic_config. write_queue_nums ,
@@ -547,6 +558,7 @@ impl RouteInfoManager {
547558
548559 let queue_data_map = self
549560 . topic_queue_table
561+ . mut_from_ref ( )
550562 . get_mut ( topic_config. topic_name . as_ref ( ) . unwrap ( ) . as_str ( ) ) ;
551563 if let Some ( queue_data_map_inner) = queue_data_map {
552564 let existed_qd = queue_data_map_inner. get ( broker_name) ;
@@ -572,15 +584,15 @@ impl RouteInfoManager {
572584 & queue_data
573585 ) ;
574586 queue_data_map_inner. insert ( broker_name. clone ( ) , queue_data) ;
575- self . topic_queue_table . insert (
587+ self . topic_queue_table . mut_from_ref ( ) . insert (
576588 topic_config. topic_name . as_ref ( ) . unwrap ( ) . clone ( ) ,
577589 queue_data_map_inner,
578590 ) ;
579591 }
580592 }
581593
582594 fn notify_min_broker_id_changed (
583- & mut self ,
595+ & self ,
584596 broker_addr_map : & HashMap < i64 , CheetahString > ,
585597 offline_broker_addr : Option < CheetahString > ,
586598 ha_broker_addr : Option < CheetahString > ,
@@ -622,7 +634,7 @@ impl RouteInfoManager {
622634 }
623635
624636 fn choose_broker_addrs_to_notify (
625- & mut self ,
637+ & self ,
626638 broker_addr_map : & HashMap < i64 , CheetahString > ,
627639 offline_broker_addr : Option < CheetahString > ,
628640 ) -> Option < Vec < CheetahString > > {
0 commit comments