Skip to content

Commit 09495a7

Browse files
committed
[ISSUE #1069]Add Shutdown struct
1 parent 2a9a342 commit 09495a7

2 files changed

Lines changed: 93 additions & 0 deletions

File tree

rocketmq/src/lib.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,12 +16,14 @@
1616
*/
1717
pub mod count_down_latch;
1818
pub mod rocketmq_tokio_lock;
19+
mod shutdown;
1920

2021
pub use count_down_latch::CountDownLatch;
2122
/// Re-export rocketmq main.
2223
pub use rocketmq::main;
2324
pub use rocketmq_tokio_lock::RocketMQTokioMutex;
2425
pub use rocketmq_tokio_lock::RocketMQTokioRwLock;
26+
pub use shutdown::Shutdown;
2527
/// Re-export tokio module.
2628
pub use tokio as rocketmq;
2729

rocketmq/src/shutdown.rs

Lines changed: 91 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,91 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
use tokio::sync::broadcast;
18+
19+
pub struct Shutdown<T> {
20+
/// `true` if the shutdown signal has been received
21+
is_shutdown: bool,
22+
23+
/// The receiver half of the channel used to listen for shutdown.
24+
notify: broadcast::Receiver<T>,
25+
}
26+
27+
impl<T> Shutdown<T>
28+
where
29+
T: Clone,
30+
{
31+
/// Create a new `Shutdown` backed by the given `broadcast::Receiver`.
32+
pub fn new(notify: broadcast::Receiver<T>) -> Shutdown<T> {
33+
Shutdown {
34+
is_shutdown: false,
35+
notify,
36+
}
37+
}
38+
39+
/// Returns `true` if the shutdown signal has been received.
40+
pub fn is_shutdown(&self) -> bool {
41+
self.is_shutdown
42+
}
43+
44+
/// Receive the shutdown notice, waiting if necessary.
45+
pub async fn recv(&mut self) {
46+
// If the shutdown signal has already been received, then return
47+
// immediately.
48+
if self.is_shutdown {
49+
return;
50+
}
51+
52+
// Cannot receive a "lag error" as only one value is ever sent.
53+
let _ = self.notify.recv().await;
54+
55+
// Remember that the signal has been received.
56+
self.is_shutdown = true;
57+
}
58+
}
59+
60+
#[cfg(test)]
61+
mod tests {
62+
use tokio::sync::broadcast;
63+
64+
use super::*;
65+
66+
#[tokio::test]
67+
async fn shutdown_initial_state() {
68+
let (_, rx) = broadcast::channel::<()>(1);
69+
let shutdown = Shutdown::new(rx);
70+
assert!(!shutdown.is_shutdown());
71+
}
72+
73+
#[tokio::test]
74+
async fn shutdown_signal_received() {
75+
let (tx, rx) = broadcast::channel::<()>(1);
76+
let mut shutdown = Shutdown::new(rx);
77+
tx.send(()).unwrap();
78+
shutdown.recv().await;
79+
assert!(shutdown.is_shutdown());
80+
}
81+
82+
#[tokio::test]
83+
async fn shutdown_signal_already_received() {
84+
let (tx, rx) = broadcast::channel::<()>(1);
85+
let mut shutdown = Shutdown::new(rx);
86+
tx.send(()).unwrap();
87+
shutdown.recv().await;
88+
shutdown.recv().await;
89+
assert!(shutdown.is_shutdown());
90+
}
91+
}

0 commit comments

Comments
 (0)