@@ -468,7 +468,77 @@ where
468468
469469 pub async fn lock_all ( & mut self ) {
470470 let broker_mqs = self . build_process_queue_table_by_broker_name ( ) . await ;
471- for ( broker_name, mqs) in broker_mqs {
471+
472+ let map = broker_mqs
473+ . into_iter ( )
474+ . map ( |( broker_name, mqs) | {
475+ let mut client_instance = self . client_instance . clone ( ) ;
476+ let process_queue_table = self . process_queue_table . clone ( ) ;
477+ let consumer_group = self . consumer_group . clone ( ) . unwrap ( ) ;
478+ async move {
479+ if mqs. is_empty ( ) {
480+ return ;
481+ }
482+ let client = client_instance. as_mut ( ) . unwrap ( ) ;
483+ let find_broker_result = client
484+ . find_broker_address_in_subscribe (
485+ broker_name. as_str ( ) ,
486+ mix_all:: MASTER_ID ,
487+ true ,
488+ )
489+ . await ;
490+ if let Some ( find_broker_result) = find_broker_result {
491+ let request_body = LockBatchRequestBody {
492+ consumer_group : Some ( consumer_group. to_owned ( ) ) ,
493+ client_id : Some ( client. client_id . clone ( ) ) ,
494+ mq_set : mqs. clone ( ) ,
495+ ..Default :: default ( )
496+ } ;
497+ let result = client
498+ . mq_client_api_impl
499+ . as_mut ( )
500+ . unwrap ( )
501+ . lock_batch_mq (
502+ find_broker_result. broker_addr . as_str ( ) ,
503+ request_body,
504+ 1_000 ,
505+ )
506+ . await ;
507+ match result {
508+ Ok ( lock_okmqset) => {
509+ let process_queue_table = process_queue_table. read ( ) . await ;
510+ for mq in & mqs {
511+ if let Some ( pq) = process_queue_table. get ( mq) {
512+ if lock_okmqset. contains ( mq) {
513+ if pq. is_locked ( ) {
514+ info ! (
515+ "the message queue locked OK, Group: {:?} {}" ,
516+ consumer_group, mq
517+ ) ;
518+ }
519+ pq. set_locked ( true ) ;
520+ pq. set_last_lock_timestamp ( get_current_millis ( ) ) ;
521+ } else {
522+ pq. set_locked ( false ) ;
523+ warn ! (
524+ "the message queue locked Failed, Group: {:?} {}" ,
525+ consumer_group, mq
526+ ) ;
527+ }
528+ }
529+ }
530+ }
531+ Err ( e) => {
532+ error ! ( "lockBatchMQ exception {}" , e) ;
533+ }
534+ }
535+ }
536+ }
537+ } )
538+ . collect :: < Vec < _ > > ( ) ;
539+ futures:: future:: join_all ( map) . await ;
540+
541+ /* for (broker_name, mqs) in broker_mqs {
472542 if mqs.is_empty() {
473543 continue;
474544 }
@@ -518,7 +588,7 @@ where
518588 }
519589 }
520590 }
521- }
591+ }*/
522592 }
523593
524594 async fn build_process_queue_table_by_broker_name (
@@ -533,7 +603,7 @@ where
533603 }
534604 let broker_name = client. get_broker_name_from_message_queue ( mq) . await ;
535605 let entry = result. entry ( broker_name) . or_insert ( HashSet :: new ( ) ) ;
536- entry. insert ( mq. clone ( ) ) ;
606+ entry. insert ( mq. to_owned ( ) ) ;
537607 }
538608 result
539609 }
0 commit comments