@@ -25,21 +25,43 @@ use crate::load_balance::message_request_mode_manager::MessageRequestModeManager
2525 use rocketmq_remoting:: protocol:: remoting_command:: RemotingCommand ;
2626 use rocketmq_remoting:: runtime:: connection_handler_context:: ConnectionHandlerContext ;
2727 use rocketmq_store:: config:: message_store_config:: MessageStoreConfig ;
28- use std:: collections:: HashMap ;
28+ use std:: collections:: { HashMap , HashSet } ;
2929 use std:: sync:: Arc ;
30+ use tracing:: warn;
31+ use rocketmq_common:: common:: broker:: broker_config:: BrokerConfig ;
32+ use rocketmq_common:: common:: message:: message_enum:: MessageRequestMode ;
33+ use rocketmq_common:: common:: message:: message_queue:: MessageQueue ;
34+ use rocketmq_common:: common:: message:: message_queue_assignment:: MessageQueueAssignment ;
35+ use rocketmq_common:: common:: mix_all;
3036use rocketmq_common:: common:: mix_all:: RETRY_GROUP_TOPIC_PREFIX ;
3137use rocketmq_remoting:: code:: response_code:: ResponseCode ;
38+ use rocketmq_remoting:: protocol:: body:: query_assignment_request_body:: QueryAssignmentRequestBody ;
39+ use rocketmq_remoting:: protocol:: body:: query_assignment_response_body:: QueryAssignmentResponseBody ;
3240use rocketmq_remoting:: protocol:: body:: set_message_request_mode_request_body:: SetMessageRequestModeRequestBody ;
33- use rocketmq_remoting:: protocol:: RemotingDeserializable ;
41+ use rocketmq_remoting:: protocol:: heartbeat:: message_model:: MessageModel ;
42+ use rocketmq_remoting:: protocol:: { RemotingDeserializable , RemotingSerializable } ;
43+ use crate :: client:: manager:: consumer_manager:: ConsumerManager ;
44+ use crate :: error:: BrokerError ;
45+ use crate :: error:: BrokerError :: IllegalArgumentError ;
46+ use crate :: topic:: manager:: topic_route_info_manager:: TopicRouteInfoManager ;
47+ use crate :: Result ;
3448
3549pub struct QueryAssignmentProcessor {
3650 message_request_mode_manager : MessageRequestModeManager ,
3751 load_strategy : HashMap < CheetahString , Arc < dyn AllocateMessageQueueStrategy > > ,
3852 message_store_config : Arc < MessageStoreConfig > ,
53+ broker_config : Arc < BrokerConfig > ,
54+ topic_route_info_manager : Arc < TopicRouteInfoManager > ,
55+ consumer_manager : Arc < ConsumerManager > ,
3956}
4057
4158impl QueryAssignmentProcessor {
42- pub fn new ( message_store_config : Arc < MessageStoreConfig > ) -> Self {
59+ pub fn new (
60+ message_store_config : Arc < MessageStoreConfig > ,
61+ broker_config : Arc < BrokerConfig > ,
62+ topic_route_info_manager : Arc < TopicRouteInfoManager > ,
63+ consumer_manager : Arc < ConsumerManager > ,
64+ ) -> Self {
4365 let allocate_message_queue_averagely: Arc < dyn AllocateMessageQueueStrategy > =
4466 Arc :: new ( AllocateMessageQueueAveragely ) ;
4567 let allocate_message_queue_averagely_by_circle: Arc < dyn AllocateMessageQueueStrategy > =
@@ -59,6 +81,9 @@ impl QueryAssignmentProcessor {
5981 message_request_mode_manager : manager,
6082 load_strategy,
6183 message_store_config,
84+ broker_config,
85+ topic_route_info_manager,
86+ consumer_manager,
6287 }
6388 }
6489}
@@ -82,11 +107,220 @@ impl QueryAssignmentProcessor {
82107
83108 async fn query_assignment (
84109 & mut self ,
85- _channel : Channel ,
110+ channel : Channel ,
86111 _ctx : ConnectionHandlerContext ,
87- _request : RemotingCommand ,
112+ request : RemotingCommand ,
88113 ) -> Option < RemotingCommand > {
89- unimplemented ! ( )
114+ let request_body =
115+ QueryAssignmentRequestBody :: decode ( request. get_body ( ) . expect ( "empty body" ) )
116+ . expect ( "decode QueryAssignmentRequestBody failed" ) ;
117+ let set_message_request_mode_request_body = self
118+ . message_request_mode_manager
119+ . get_message_request_mode ( & request_body. topic , & request_body. consumer_group ) ;
120+ let set_message_request_mode_request_body =
121+ if let Some ( set_message_request_mode_request_body) =
122+ set_message_request_mode_request_body
123+ {
124+ set_message_request_mode_request_body
125+ } else {
126+ let mut body = SetMessageRequestModeRequestBody {
127+ topic : request_body. topic . clone ( ) ,
128+ consumer_group : request_body. consumer_group . clone ( ) ,
129+ ..Default :: default ( )
130+ } ;
131+ if request_body. topic . starts_with ( RETRY_GROUP_TOPIC_PREFIX ) {
132+ body. mode = MessageRequestMode :: Pull ;
133+ } else {
134+ body. mode = self . broker_config . default_message_request_mode ;
135+ }
136+ if body. mode == MessageRequestMode :: Pop {
137+ body. pop_share_queue_num = self . broker_config . default_pop_share_queue_num ;
138+ }
139+ body
140+ } ;
141+ let mode = set_message_request_mode_request_body. mode ;
142+ let message_queues = self
143+ . do_load_balance (
144+ & request_body. topic ,
145+ & request_body. consumer_group ,
146+ & request_body. client_id ,
147+ request_body. message_model ,
148+ & request_body. strategy_name ,
149+ set_message_request_mode_request_body,
150+ channel,
151+ )
152+ . await ;
153+ let assignments = if let Some ( message_queues) = message_queues {
154+ message_queues
155+ . into_iter ( )
156+ . map ( |mq| MessageQueueAssignment {
157+ message_queue : Some ( mq) ,
158+ mode,
159+ attachments : None ,
160+ } )
161+ . collect ( )
162+ } else {
163+ HashSet :: new ( )
164+ } ;
165+ let body = QueryAssignmentResponseBody {
166+ message_queue_assignments : assignments,
167+ } ;
168+ Some ( RemotingCommand :: create_response_command ( ) . set_body ( body. encode ( ) ) )
169+ }
170+
171+ async fn do_load_balance (
172+ & mut self ,
173+ topic : & CheetahString ,
174+ consumer_group : & CheetahString ,
175+ client_id : & CheetahString ,
176+ message_model : MessageModel ,
177+ strategy_name : & CheetahString ,
178+ set_message_request_mode_request_body : SetMessageRequestModeRequestBody ,
179+ channel : Channel ,
180+ ) -> Option < HashSet < MessageQueue > > {
181+ match message_model {
182+ MessageModel :: Broadcasting => {
183+ let assigned_queue_set = self
184+ . topic_route_info_manager
185+ . get_topic_subscribe_info ( topic)
186+ . await ;
187+ if assigned_queue_set. is_none ( ) {
188+ warn ! (
189+ "QueryLoad: no assignment for group[{}], the topic[{}] does not exist." ,
190+ consumer_group, topic
191+ ) ;
192+ }
193+ assigned_queue_set
194+ }
195+ MessageModel :: Clustering => {
196+ let mq_set = if mix_all:: is_lmq ( Some ( topic. as_str ( ) ) ) {
197+ let mut set = HashSet :: new ( ) ;
198+ let queue = MessageQueue :: from_parts (
199+ topic. clone ( ) ,
200+ self . broker_config . broker_name . clone ( ) ,
201+ mix_all:: LMQ_QUEUE_ID as i32 ,
202+ ) ;
203+ set. insert ( queue) ;
204+ Some ( set)
205+ } else {
206+ self . topic_route_info_manager
207+ . get_topic_subscribe_info ( topic)
208+ . await
209+ } ;
210+
211+ if mq_set. is_none ( ) || mq_set. as_ref ( ) . unwrap ( ) . is_empty ( ) {
212+ if topic. starts_with ( RETRY_GROUP_TOPIC_PREFIX ) {
213+ warn ! (
214+ "QueryLoad: no assignment for group[{}], the topic[{}] does not exist." ,
215+ consumer_group, topic
216+ ) ;
217+ }
218+ return None ;
219+ }
220+
221+ if !self . broker_config . server_load_balancer_enable {
222+ return mq_set;
223+ }
224+ let consumer_group_info = self
225+ . consumer_manager
226+ . get_consumer_group_info ( consumer_group) ;
227+ let mut cid_all = if let Some ( consumer_group_info) = consumer_group_info {
228+ consumer_group_info. get_all_client_ids ( )
229+ } else {
230+ vec ! [ ]
231+ } ;
232+ if cid_all. is_empty ( ) {
233+ warn ! (
234+ "QueryLoad: no assignment for group[{}] topic[{}], get consumer id list \
235+ failed",
236+ consumer_group, topic
237+ ) ;
238+ return None ;
239+ }
240+ let mut mq_all = mq_set. unwrap ( ) . into_iter ( ) . collect :: < Vec < MessageQueue > > ( ) ;
241+ mq_all. sort ( ) ;
242+ cid_all. sort ( ) ;
243+
244+ let strategy = self . load_strategy . get ( strategy_name) ;
245+ if strategy. is_none ( ) {
246+ warn ! (
247+ "QueryLoad: unsupported strategy [{}], {}" ,
248+ strategy_name,
249+ channel. remote_address( )
250+ ) ;
251+ return None ;
252+ }
253+ let strategy = strategy. unwrap ( ) ;
254+ let result =
255+ if set_message_request_mode_request_body. mode == MessageRequestMode :: Pop {
256+ self . allocate_for_pop (
257+ strategy,
258+ consumer_group,
259+ client_id,
260+ mq_all. as_slice ( ) ,
261+ cid_all. as_slice ( ) ,
262+ set_message_request_mode_request_body. pop_share_queue_num ,
263+ )
264+ } else {
265+ match strategy. allocate (
266+ consumer_group,
267+ client_id,
268+ mq_all. as_slice ( ) ,
269+ cid_all. as_slice ( ) ,
270+ ) {
271+ Ok ( value) => Ok ( value. into_iter ( ) . collect :: < HashSet < MessageQueue > > ( ) ) ,
272+ Err ( e) => Err ( BrokerError :: ClientError ( e) ) ,
273+ }
274+ } ;
275+ match result {
276+ Ok ( value) => Some ( value) ,
277+ Err ( _) => None ,
278+ }
279+ }
280+ }
281+ }
282+
283+ pub fn allocate_for_pop (
284+ & self ,
285+ strategy : & Arc < dyn AllocateMessageQueueStrategy > ,
286+ consumer_group : & CheetahString ,
287+ current_cid : & CheetahString ,
288+ mq_all : & [ MessageQueue ] ,
289+ cid_all : & [ CheetahString ] ,
290+ pop_share_queue_num : i32 ,
291+ ) -> Result < HashSet < MessageQueue > > {
292+ if pop_share_queue_num <= 0 || pop_share_queue_num > cid_all. len ( ) as i32 - 1 {
293+ Ok ( mq_all
294+ . iter ( )
295+ . map ( |mq| {
296+ MessageQueue :: from_parts (
297+ mq. get_topic_cs ( ) . clone ( ) ,
298+ mq. get_broker_name ( ) . clone ( ) ,
299+ -1 ,
300+ )
301+ } )
302+ . collect :: < HashSet < MessageQueue > > ( ) )
303+ } else if cid_all. len ( ) <= mq_all. len ( ) {
304+ let mut allocate_result = strategy
305+ . allocate ( consumer_group, current_cid, mq_all, cid_all)
306+ . unwrap ( ) ;
307+ let index = cid_all. iter ( ) . position ( |cid| cid == current_cid) ;
308+ if let Some ( mut index) = index {
309+ for _i in 1 ..pop_share_queue_num {
310+ index += index;
311+ index %= cid_all. len ( ) ;
312+ let result = strategy
313+ . allocate ( consumer_group, & cid_all[ index] , mq_all, cid_all)
314+ . unwrap ( ) ;
315+ allocate_result. extend ( result) ;
316+ }
317+ }
318+ Ok ( allocate_result
319+ . into_iter ( )
320+ . collect :: < HashSet < MessageQueue > > ( ) )
321+ } else {
322+ allocate ( consumer_group, current_cid, mq_all, cid_all)
323+ }
90324 }
91325
92326 async fn set_message_request_mode (
@@ -117,3 +351,39 @@ impl QueryAssignmentProcessor {
117351 ) )
118352 }
119353}
354+
355+ fn allocate (
356+ consumer_group : & CheetahString ,
357+ current_cid : & CheetahString ,
358+ mq_all : & [ MessageQueue ] ,
359+ cid_all : & [ CheetahString ] ,
360+ ) -> Result < HashSet < MessageQueue > > {
361+ if current_cid. is_empty ( ) {
362+ return Err ( IllegalArgumentError ( "currentCID is empty" . to_string ( ) ) ) ;
363+ }
364+ if mq_all. is_empty ( ) {
365+ return Err ( IllegalArgumentError (
366+ "mqAll is null or mqAll empty" . to_string ( ) ,
367+ ) ) ;
368+ }
369+ if cid_all. is_empty ( ) {
370+ return Err ( IllegalArgumentError (
371+ "cidAll is null or cidAll empty" . to_string ( ) ,
372+ ) ) ;
373+ }
374+
375+ let mut result = HashSet :: new ( ) ;
376+ if !cid_all. contains ( current_cid) {
377+ log:: info!(
378+ "[BUG] ConsumerGroup: {} The consumerId: {} not in cidAll: {:?}" ,
379+ consumer_group,
380+ current_cid,
381+ cid_all
382+ ) ;
383+ return Ok ( result) ;
384+ }
385+
386+ let index = cid_all. iter ( ) . position ( |cid| cid == current_cid) . unwrap ( ) ;
387+ result. insert ( mq_all[ index % mq_all. len ( ) ] . clone ( ) ) ;
388+ Ok ( result)
389+ }
0 commit comments