Skip to content

Commit a464f86

Browse files
committed
[ISSUE #1076]PullMessageService and RebalanceService add shutdown method
1 parent 7f98dbe commit a464f86

2 files changed

Lines changed: 57 additions & 2 deletions

File tree

rocketmq-client/src/consumer/consumer_impl/pull_message_service.rs

Lines changed: 41 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
*/
1717
use rocketmq_common::common::message::message_enum::MessageRequestMode;
1818
use rocketmq_common::ArcRefCellWrapper;
19+
use rocketmq_rust::Shutdown;
1920
use tracing::info;
2021
use tracing::warn;
2122

@@ -27,19 +28,25 @@ use crate::factory::mq_client_instance::MQClientInstance;
2728
#[derive(Clone)]
2829
pub struct PullMessageService {
2930
tx: Option<tokio::sync::mpsc::Sender<Box<dyn MessageRequest + Send + 'static>>>,
31+
tx_shutdown: Option<tokio::sync::broadcast::Sender<()>>,
3032
}
3133

3234
impl PullMessageService {
3335
pub fn new() -> Self {
34-
PullMessageService { tx: None }
36+
PullMessageService {
37+
tx: None,
38+
tx_shutdown: None,
39+
}
3540
}
3641
pub async fn start(&mut self, mut instance: ArcRefCellWrapper<MQClientInstance>) {
3742
let (tx, mut rx) =
3843
tokio::sync::mpsc::channel::<Box<dyn MessageRequest + Send + 'static>>(1024 * 4);
44+
let (mut shutdown, tx_shutdown) = Shutdown::new(1);
3945
self.tx = Some(tx);
46+
self.tx_shutdown = Some(tx_shutdown);
4047
tokio::spawn(async move {
4148
info!(">>>>>>>>>>>>>>>>>>>>>>>PullMessageService started<<<<<<<<<<<<<<<<<<<<<<<<<<<<");
42-
while let Some(request) = rx.recv().await {
49+
/*while let Some(request) = rx.recv().await {
4350
if request.get_message_request_mode() == MessageRequestMode::Pull {
4451
let pull_request =
4552
unsafe { *Box::from_raw(Box::into_raw(request) as *mut PullRequest) };
@@ -49,6 +56,32 @@ impl PullMessageService {
4956
unsafe { *Box::from_raw(Box::into_raw(request) as *mut PopRequest) };
5057
PullMessageService::pop_message(pop_request, instance.as_mut()).await;
5158
}
59+
}*/
60+
if shutdown.is_shutdown() {
61+
info!("PullMessageService shutdown");
62+
return;
63+
}
64+
loop {
65+
tokio::select! {
66+
_ = shutdown.recv() => {
67+
info!("PullMessageService shutdown");
68+
}
69+
Some(request) = rx.recv() => {
70+
if request.get_message_request_mode() == MessageRequestMode::Pull {
71+
let pull_request =
72+
unsafe { *Box::from_raw(Box::into_raw(request) as *mut PullRequest) };
73+
PullMessageService::pull_message(pull_request, instance.as_mut()).await;
74+
} else {
75+
let pop_request =
76+
unsafe { *Box::from_raw(Box::into_raw(request) as *mut PopRequest) };
77+
PullMessageService::pop_message(pop_request, instance.as_mut()).await;
78+
}
79+
}
80+
}
81+
if shutdown.is_shutdown() {
82+
info!("PullMessageService shutdown");
83+
break;
84+
}
5285
}
5386
});
5487
}
@@ -106,4 +139,10 @@ impl PullMessageService {
106139
warn!("Failed to send pull request to pull_tx, error: {:?}", e);
107140
}
108141
}
142+
143+
pub fn shutdown(&self) {
144+
if let Err(e) = self.tx_shutdown.as_ref().unwrap().send(()) {
145+
warn!("Failed to send shutdown signal to pull_tx, error: {:?}", e);
146+
}
147+
}
109148
}

rocketmq-client/src/consumer/consumer_impl/re_balance/rebalance_service.rs

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,10 +19,12 @@ use std::time::Duration;
1919

2020
use once_cell::sync::Lazy;
2121
use rocketmq_common::ArcRefCellWrapper;
22+
use rocketmq_rust::Shutdown;
2223
use tokio::select;
2324
use tokio::sync::Notify;
2425
use tokio::time::Instant;
2526
use tracing::info;
27+
use tracing::warn;
2628

2729
use crate::factory::mq_client_instance::MQClientInstance;
2830

@@ -47,17 +49,21 @@ static MIN_INTERVAL: Lazy<Duration> = Lazy::new(|| {
4749
#[derive(Clone)]
4850
pub struct RebalanceService {
4951
notify: Arc<Notify>,
52+
tx_shutdown: Option<tokio::sync::broadcast::Sender<()>>,
5053
}
5154

5255
impl RebalanceService {
5356
pub fn new() -> Self {
5457
RebalanceService {
5558
notify: Arc::new(Notify::new()),
59+
tx_shutdown: None,
5660
}
5761
}
5862

5963
pub async fn start(&mut self, mut instance: ArcRefCellWrapper<MQClientInstance>) {
6064
let notify = self.notify.clone();
65+
let (mut shutdown, tx_shutdown) = Shutdown::new(1);
66+
self.tx_shutdown = Some(tx_shutdown);
6167
tokio::spawn(async move {
6268
let mut last_rebalance_timestamp = Instant::now();
6369
let min_interval = *MIN_INTERVAL;
@@ -66,8 +72,12 @@ impl RebalanceService {
6672
loop {
6773
select! {
6874
_ = notify.notified() => {}
75+
_ = shutdown.recv() => {info!("RebalanceService shutdown");}
6976
_ = tokio::time::sleep(real_wait_interval) => {}
7077
}
78+
if shutdown.is_shutdown() {
79+
return;
80+
}
7181
let interval = Instant::now() - last_rebalance_timestamp;
7282
if interval < min_interval {
7383
real_wait_interval = min_interval - interval;
@@ -87,4 +97,10 @@ impl RebalanceService {
8797
pub fn wakeup(&self) {
8898
self.notify.notify_waiters();
8999
}
100+
101+
pub fn shutdown(&self) {
102+
if let Err(e) = self.tx_shutdown.as_ref().unwrap().send(()) {
103+
warn!("Failed to send shutdown signal to pull_tx, error: {:?}", e);
104+
}
105+
}
90106
}

0 commit comments

Comments
 (0)