@@ -35,6 +35,7 @@ use rocketmq_common::common::mix_all::is_lmq;
3535use rocketmq_common:: common:: mix_all:: is_sys_consumer_group_for_no_cold_read_limit;
3636use rocketmq_common:: common:: mix_all:: MULTI_DISPATCH_QUEUE_SPLITTER ;
3737use rocketmq_common:: common:: mix_all:: RETRY_GROUP_TOPIC_PREFIX ;
38+ use rocketmq_common:: utils:: util_all;
3839use rocketmq_common:: CleanupPolicyUtils :: get_delete_policy;
3940use rocketmq_common:: TimeUtils :: get_current_millis;
4041use rocketmq_common:: {
@@ -67,6 +68,8 @@ use crate::base::store_checkpoint::StoreCheckpoint;
6768use crate :: base:: store_stats_service:: StoreStatsService ;
6869use crate :: config:: broker_role:: BrokerRole ;
6970use crate :: config:: message_store_config:: MessageStoreConfig ;
71+ use crate :: config:: store_path_config_helper:: get_store_path_batch_consume_queue;
72+ use crate :: config:: store_path_config_helper:: get_store_path_consume_queue_ext;
7073use crate :: filter:: MessageFilter ;
7174use crate :: hook:: put_message_hook:: BoxedPutMessageHook ;
7275use crate :: index:: index_dispatch:: CommitLogDispatcherBuildIndex ;
@@ -1040,22 +1043,50 @@ impl MessageStore for DefaultMessageStore {
10401043 )
10411044 }
10421045
1043- fn delete_topics ( & self , delete_topics : Vec < String > ) {
1046+ fn delete_topics ( & mut self , delete_topics : Vec < & str > ) -> i32 {
10441047 if delete_topics. is_empty ( ) {
1045- return ;
1048+ return 0 ;
10461049 }
1050+ let mut delete_count = 0 ;
10471051 for topic in delete_topics {
1048- let queue_table = self
1049- . consume_queue_store
1050- . find_consume_queue_map ( topic. as_str ( ) ) ;
1052+ let queue_table = self . consume_queue_store . find_consume_queue_map ( topic) ;
10511053 if queue_table. is_none ( ) {
10521054 continue ;
10531055 }
1054- /* for (queue_id, consume_queue) in queue_table.unwrap() {
1055- consume_queue.lock().destroy();
1056- self.consume_queue_store.delete_queue(topic.as_str(), *queue_id);
1057- }*/
1056+ let queue_table = queue_table. unwrap ( ) ;
1057+ for ( queue_id, consume_queue) in queue_table {
1058+ self . consume_queue_store
1059+ . destroy_consume_queue ( consume_queue. as_ref ( ) . as_ref ( ) ) ;
1060+ self . consume_queue_store
1061+ . remove_topic_queue_table ( topic, queue_id) ;
1062+ }
1063+ // remove topic from cq table
1064+ let consume_queue_table = self . consume_queue_store . get_consume_queue_table ( ) ;
1065+ consume_queue_table. lock ( ) . remove ( topic) ;
1066+
1067+ if self . broker_config . auto_delete_unused_stats {
1068+ self . broker_stats_manager
1069+ . as_ref ( )
1070+ . unwrap ( )
1071+ . on_topic_deleted ( topic) ;
1072+ }
1073+
1074+ let root_dir = self . message_store_config . store_path_root_dir . as_str ( ) ;
1075+ let consume_queue_dir =
1076+ PathBuf :: from ( get_store_path_consume_queue ( root_dir) ) . join ( topic) ;
1077+ let consume_queue_ext_dir =
1078+ PathBuf :: from ( get_store_path_consume_queue_ext ( root_dir) ) . join ( topic) ;
1079+ let batch_consume_queue_dir =
1080+ PathBuf :: from ( get_store_path_batch_consume_queue ( root_dir) ) . join ( topic) ;
1081+
1082+ util_all:: delete_empty_directory ( consume_queue_dir) ;
1083+ util_all:: delete_empty_directory ( consume_queue_ext_dir) ;
1084+ util_all:: delete_empty_directory ( batch_consume_queue_dir) ;
1085+ info ! ( "DeleteTopic: Topic has been destroyed, topic={}" , topic) ;
1086+ delete_count += 1 ;
10581087 }
1088+
1089+ delete_count
10591090 }
10601091}
10611092
0 commit comments