1717
1818use std:: collections:: HashSet ;
1919use std:: sync:: atomic:: AtomicBool ;
20+ use std:: sync:: atomic:: Ordering ;
2021use std:: sync:: Arc ;
2122
2223use rocketmq_common:: common:: base:: service_state:: ServiceState ;
@@ -44,7 +45,7 @@ use crate::consumer::consumer_impl::consume_message_service::ConsumeMessageConcu
4445use crate :: consumer:: consumer_impl:: consume_message_service:: ConsumeMessageOrderlyServiceGeneral ;
4546use crate :: consumer:: consumer_impl:: consume_message_service:: ConsumeMessageServiceTrait ;
4647use crate :: consumer:: consumer_impl:: pull_api_wrapper:: PullAPIWrapper ;
47- use crate :: consumer:: consumer_impl:: rebalance_push_impl:: RebalancePushImpl ;
48+ use crate :: consumer:: consumer_impl:: re_balance :: rebalance_push_impl:: RebalancePushImpl ;
4849use crate :: consumer:: default_mq_push_consumer:: ConsumerConfig ;
4950use crate :: consumer:: listener:: message_listener:: MessageListener ;
5051use crate :: consumer:: mq_consumer_inner:: MQConsumerInner ;
@@ -76,7 +77,7 @@ pub struct DefaultMQPushConsumerImpl {
7677 filter_message_hook_list : Vec < Arc < Box < dyn FilterMessageHook + Send + Sync > > > ,
7778 rpc_hook : Option < Arc < Box < dyn RPCHook > > > ,
7879 service_state : ServiceState ,
79- mq_client_factory : Option < ArcRefCellWrapper < MQClientInstance > > ,
80+ mq_client_factory : Option < ArcRefCellWrapper < MQClientInstance < DefaultMQPushConsumerImpl > > > ,
8081 pull_api_wrapper : Option < ArcRefCellWrapper < PullAPIWrapper > > ,
8182 pause : Arc < AtomicBool > ,
8283 consume_orderly : bool ,
@@ -109,10 +110,13 @@ impl DefaultMQPushConsumerImpl {
109110 consumer_config : ConsumerConfig ,
110111 rpc_hook : Option < Arc < Box < dyn RPCHook > > > ,
111112 ) -> Self {
112- Self {
113- client_config,
114- consumer_config,
115- rebalance_impl : ArcRefCellWrapper :: new ( RebalancePushImpl ) ,
113+ let mut this = Self {
114+ client_config : client_config. clone ( ) ,
115+ consumer_config : consumer_config. clone ( ) ,
116+ rebalance_impl : ArcRefCellWrapper :: new ( RebalancePushImpl :: new (
117+ client_config,
118+ consumer_config,
119+ ) ) ,
116120 filter_message_hook_list : vec ! [ ] ,
117121 rpc_hook,
118122 service_state : ServiceState :: CreateJust ,
@@ -129,7 +133,10 @@ impl DefaultMQPushConsumerImpl {
129133 pop_delay_level : Arc :: new ( [
130134 10 , 30 , 60 , 120 , 180 , 240 , 300 , 360 , 420 , 480 , 540 , 600 , 1200 , 1800 , 3600 , 7200 ,
131135 ] ) ,
132- }
136+ } ;
137+ let wrapper = ArcRefCellWrapper :: downgrade ( & this. rebalance_impl ) ;
138+ this. rebalance_impl . set_rebalance_impl ( wrapper) ;
139+ this
133140 }
134141}
135142
@@ -138,7 +145,7 @@ impl DefaultMQPushConsumerImpl {
138145 match self . service_state {
139146 ServiceState :: CreateJust => {
140147 info ! (
141- "the consumer [{}] start beginning. messageModel ={}, isUnitMode={}" ,
148+ "the consumer [{}] start beginning. message_model ={}, isUnitMode={}" ,
142149 self . consumer_config. consumer_group,
143150 self . consumer_config. message_model,
144151 self . consumer_config. unit_mode
@@ -160,7 +167,12 @@ impl DefaultMQPushConsumerImpl {
160167 self . rebalance_impl
161168 . set_message_model ( self . consumer_config . message_model ) ;
162169 self . rebalance_impl . set_allocate_message_queue_strategy (
163- self . consumer_config . allocate_message_queue_strategy . clone ( ) ,
170+ self . consumer_config
171+ . allocate_message_queue_strategy
172+ . clone ( )
173+ . expect (
174+ "allocate_message_queue_strategy is null, please set it before start" ,
175+ ) ,
164176 ) ;
165177 self . rebalance_impl
166178 . set_mq_client_factory ( client_instance. clone ( ) ) ;
@@ -249,7 +261,7 @@ impl DefaultMQPushConsumerImpl {
249261
250262 self . mq_client_factory . as_mut ( ) . unwrap ( ) . start ( ) . await ?;
251263 info ! (
252- "the consumer [{}] start OK, messageModel ={}, isUnitMode={}" ,
264+ "the consumer [{}] start OK, message_model ={}, isUnitMode={}" ,
253265 self . consumer_config. consumer_group,
254266 self . consumer_config. message_model,
255267 self . consumer_config. unit_mode
@@ -301,7 +313,7 @@ impl DefaultMQPushConsumerImpl {
301313 return Err ( MQClientError :: MQClientException (
302314 -1 ,
303315 format ! (
304- "consumerGroup is empty, {}" ,
316+ "consumer_group is empty, {}" ,
305317 FAQUrl :: suggest_todo( FAQUrl :: CLIENT_PARAMETER_CHECK_URL )
306318 ) ,
307319 ) ) ;
@@ -311,7 +323,7 @@ impl DefaultMQPushConsumerImpl {
311323 return Err ( MQClientError :: MQClientException (
312324 -1 ,
313325 format ! (
314- "consumerGroup can not equal {} please specify another one.{}" ,
326+ "consumer_group can not equal {} please specify another one.{}" ,
315327 DEFAULT_CONSUMER_GROUP ,
316328 FAQUrl :: suggest_todo( FAQUrl :: CLIENT_PARAMETER_CHECK_URL )
317329 ) ,
@@ -340,7 +352,7 @@ impl DefaultMQPushConsumerImpl {
340352 return Err ( MQClientError :: MQClientException (
341353 -1 ,
342354 format ! (
343- "allocateMessageQueueStrategy is null{}" ,
355+ "allocate_message_queue_strategy is null{}" ,
344356 FAQUrl :: suggest_todo( FAQUrl :: CLIENT_PARAMETER_CHECK_URL )
345357 ) ,
346358 ) ) ;
@@ -611,8 +623,11 @@ impl MQConsumerInner for DefaultMQPushConsumerImpl {
611623 todo ! ( )
612624 }
613625
614- fn try_rebalance ( & self ) -> bool {
615- todo ! ( )
626+ async fn try_rebalance ( & self ) -> Result < bool > {
627+ if !self . pause . load ( Ordering :: Acquire ) {
628+ //self.rebalance_impl.do
629+ }
630+ unimplemented ! ( )
616631 }
617632
618633 fn persist_consumer_offset ( & self ) {
0 commit comments