Skip to content

Commit aaae950

Browse files
committed
[ISSUE #773]🔥Optimize Topic request handle⚡️
1 parent 5c5ed6b commit aaae950

8 files changed

Lines changed: 82 additions & 18 deletions

File tree

rocketmq-broker/src/processor/admin_broker_processor/topic_request_handler.rs

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -388,7 +388,7 @@ impl TopicRequestHandler {
388388
Some(response)
389389
}
390390

391-
fn delete_topic_in_broker(&self, topic: &str) {
391+
fn delete_topic_in_broker(&mut self, topic: &str) {
392392
self.inner.topic_config_manager.delete_topic_config(topic);
393393
self.inner.topic_queue_mapping_manager.delete(topic);
394394
self.inner
@@ -397,8 +397,6 @@ impl TopicRequestHandler {
397397
self.inner
398398
.pop_inflight_message_counter
399399
.clear_in_flight_message_num_by_topic_name(topic);
400-
self.inner
401-
.default_message_store
402-
.delete_topics(vec![topic.to_string()]);
400+
self.inner.default_message_store.delete_topics(vec![topic]);
403401
}
404402
}

rocketmq-common/src/common/broker/broker_config.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -162,6 +162,7 @@ pub struct BrokerConfig {
162162
pub bit_map_length_consume_queue_ext: i32,
163163
pub validate_system_topic_when_update_topic: bool,
164164
pub enable_mixed_message_type: bool,
165+
pub auto_delete_unused_stats: bool,
165166
pub forward_timeout: u64,
166167
}
167168

@@ -237,6 +238,7 @@ impl Default for BrokerConfig {
237238
forward_timeout: 3 * 1000,
238239
validate_system_topic_when_update_topic: true,
239240
enable_mixed_message_type: false,
241+
auto_delete_unused_stats: false,
240242
}
241243
}
242244
}

rocketmq-common/src/utils/util_all.rs

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717

1818
use std::env;
1919
use std::fs;
20+
use std::io;
2021
use std::path::Path;
2122
use std::path::PathBuf;
2223
use std::time::Duration;
@@ -250,6 +251,27 @@ pub fn compute_next_morning_time_millis() -> u64 {
250251
next_morning.timestamp_millis() as u64
251252
}
252253

254+
pub fn delete_empty_directory<P: AsRef<Path>>(path: P) {
255+
let path = path.as_ref();
256+
if !path.exists() {
257+
return;
258+
}
259+
if !path.is_dir() {
260+
return;
261+
}
262+
match fs::read_dir(path) {
263+
Ok(entries) => {
264+
if entries.count() == 0 {
265+
match fs::remove_dir(path) {
266+
Ok(_) => info!("delete empty directory, {}", path.display()),
267+
Err(e) => error!("Error deleting directory: {}", e),
268+
}
269+
}
270+
}
271+
Err(e) => error!("Error reading directory: {}", e),
272+
}
273+
}
274+
253275
#[cfg(test)]
254276
mod tests {
255277
use std::time::Instant;

rocketmq-store/src/log_file.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -115,5 +115,5 @@ pub trait RocketMQMessageStore: Clone + 'static {
115115

116116
fn find_consume_queue(&self, topic: &str, queue_id: i32) -> Option<ArcConsumeQueue>;
117117

118-
fn delete_topics(&self, delete_topics: Vec<String>);
118+
fn delete_topics(&mut self, delete_topics: Vec<&str>) -> i32;
119119
}

rocketmq-store/src/message_store/default_message_store.rs

Lines changed: 40 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@ use rocketmq_common::common::mix_all::is_lmq;
3535
use rocketmq_common::common::mix_all::is_sys_consumer_group_for_no_cold_read_limit;
3636
use rocketmq_common::common::mix_all::MULTI_DISPATCH_QUEUE_SPLITTER;
3737
use rocketmq_common::common::mix_all::RETRY_GROUP_TOPIC_PREFIX;
38+
use rocketmq_common::utils::util_all;
3839
use rocketmq_common::CleanupPolicyUtils::get_delete_policy;
3940
use rocketmq_common::TimeUtils::get_current_millis;
4041
use rocketmq_common::{
@@ -67,6 +68,8 @@ use crate::base::store_checkpoint::StoreCheckpoint;
6768
use crate::base::store_stats_service::StoreStatsService;
6869
use crate::config::broker_role::BrokerRole;
6970
use crate::config::message_store_config::MessageStoreConfig;
71+
use crate::config::store_path_config_helper::get_store_path_batch_consume_queue;
72+
use crate::config::store_path_config_helper::get_store_path_consume_queue_ext;
7073
use crate::filter::MessageFilter;
7174
use crate::hook::put_message_hook::BoxedPutMessageHook;
7275
use crate::index::index_dispatch::CommitLogDispatcherBuildIndex;
@@ -1040,22 +1043,50 @@ impl MessageStore for DefaultMessageStore {
10401043
)
10411044
}
10421045

1043-
fn delete_topics(&self, delete_topics: Vec<String>) {
1046+
fn delete_topics(&mut self, delete_topics: Vec<&str>) -> i32 {
10441047
if delete_topics.is_empty() {
1045-
return;
1048+
return 0;
10461049
}
1050+
let mut delete_count = 0;
10471051
for topic in delete_topics {
1048-
let queue_table = self
1049-
.consume_queue_store
1050-
.find_consume_queue_map(topic.as_str());
1052+
let queue_table = self.consume_queue_store.find_consume_queue_map(topic);
10511053
if queue_table.is_none() {
10521054
continue;
10531055
}
1054-
/* for (queue_id, consume_queue) in queue_table.unwrap() {
1055-
consume_queue.lock().destroy();
1056-
self.consume_queue_store.delete_queue(topic.as_str(), *queue_id);
1057-
}*/
1056+
let queue_table = queue_table.unwrap();
1057+
for (queue_id, consume_queue) in queue_table {
1058+
self.consume_queue_store
1059+
.destroy_consume_queue(consume_queue.as_ref().as_ref());
1060+
self.consume_queue_store
1061+
.remove_topic_queue_table(topic, queue_id);
1062+
}
1063+
// remove topic from cq table
1064+
let consume_queue_table = self.consume_queue_store.get_consume_queue_table();
1065+
consume_queue_table.lock().remove(topic);
1066+
1067+
if self.broker_config.auto_delete_unused_stats {
1068+
self.broker_stats_manager
1069+
.as_ref()
1070+
.unwrap()
1071+
.on_topic_deleted(topic);
1072+
}
1073+
1074+
let root_dir = self.message_store_config.store_path_root_dir.as_str();
1075+
let consume_queue_dir =
1076+
PathBuf::from(get_store_path_consume_queue(root_dir)).join(topic);
1077+
let consume_queue_ext_dir =
1078+
PathBuf::from(get_store_path_consume_queue_ext(root_dir)).join(topic);
1079+
let batch_consume_queue_dir =
1080+
PathBuf::from(get_store_path_batch_consume_queue(root_dir)).join(topic);
1081+
1082+
util_all::delete_empty_directory(consume_queue_dir);
1083+
util_all::delete_empty_directory(consume_queue_ext_dir);
1084+
util_all::delete_empty_directory(batch_consume_queue_dir);
1085+
info!("DeleteTopic: Topic has been destroyed, topic={}", topic);
1086+
delete_count += 1;
10581087
}
1088+
1089+
delete_count
10591090
}
10601091
}
10611092

rocketmq-store/src/queue.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
* limitations under the License.
1616
*/
1717
use std::collections::HashMap;
18+
use std::sync::Arc;
1819

1920
use rocketmq_common::common::attribute::cq_type::CQType;
2021
use rocketmq_common::common::boundary_type::BoundaryType;
@@ -284,6 +285,8 @@ pub trait ConsumeQueueStoreTrait: Send + Sync {
284285
fn get_min_offset_in_queue(&self, topic: &str, queue_id: i32) -> i64;
285286

286287
fn get_max_offset_in_queue(&self, topic: &str, queue_id: i32) -> i64;
288+
289+
fn get_consume_queue_table(&self) -> Arc<ConsumeQueueTable>;
287290
}
288291

289292
/// Trait representing ConsumeQueueInterface.

rocketmq-store/src/queue/local_file_consume_queue_store.rs

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,7 @@ struct Inner {
5858
pub(crate) message_store_config: Arc<MessageStoreConfig>,
5959
pub(crate) broker_config: Arc<BrokerConfig>,
6060
pub(crate) queue_offset_operator: QueueOffsetOperator,
61-
pub(crate) consume_queue_table: ConsumeQueueTable,
61+
pub(crate) consume_queue_table: Arc<ConsumeQueueTable>,
6262
}
6363

6464
impl Inner {
@@ -85,7 +85,7 @@ impl ConsumeQueueStore {
8585
message_store_config,
8686
broker_config,
8787
queue_offset_operator: QueueOffsetOperator::new(),
88-
consume_queue_table: parking_lot::Mutex::new(HashMap::new()),
88+
consume_queue_table: Arc::new(parking_lot::Mutex::new(HashMap::new())),
8989
}),
9090
running_flags,
9191
store_checkpoint,
@@ -151,7 +151,9 @@ impl ConsumeQueueStoreTrait for ConsumeQueueStore {
151151
}
152152

153153
fn destroy_consume_queue(&self, consume_queue: &dyn ConsumeQueueTrait) {
154-
todo!()
154+
let mut file_queue_life_cycle =
155+
self.get_life_cycle(consume_queue.get_topic(), consume_queue.get_queue_id());
156+
file_queue_life_cycle.destroy();
155157
}
156158

157159
fn flush(&self, consume_queue: &dyn ConsumeQueueTrait, flush_least_pages: i32) -> bool {
@@ -282,7 +284,7 @@ impl ConsumeQueueStoreTrait for ConsumeQueueStore {
282284
}
283285

284286
fn remove_topic_queue_table(&mut self, topic: &str, queue_id: i32) {
285-
todo!()
287+
self.inner.queue_offset_operator.remove(topic, queue_id);
286288
}
287289

288290
fn get_topic_queue_table(&self) -> HashMap<String, i64> {
@@ -383,6 +385,10 @@ impl ConsumeQueueStoreTrait for ConsumeQueueStore {
383385
fn get_max_offset_in_queue(&self, topic: &str, queue_id: i32) -> i64 {
384386
todo!()
385387
}
388+
389+
fn get_consume_queue_table(&self) -> Arc<ConsumeQueueTable> {
390+
self.inner.consume_queue_table.clone()
391+
}
386392
}
387393

388394
impl ConsumeQueueStore {

rocketmq-store/src/stats/broker_stats_manager.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -485,6 +485,8 @@ impl BrokerStatsManager {
485485

486486
pub fn inc_group_ack_nums(&self, group: &str, topic: &str, inc_value: i32) {}
487487
pub fn inc_broker_get_nums(&self, group: &str, inc_value: i32) {}
488+
489+
pub fn on_topic_deleted(&self, topic: &str) {}
488490
}
489491

490492
pub fn create_statistics_kind_meta(

0 commit comments

Comments
 (0)