|
14 | 14 | * See the License for the specific language governing permissions and |
15 | 15 | * limitations under the License. |
16 | 16 | */ |
| 17 | +use std::sync::atomic::AtomicBool; |
| 18 | +use std::sync::atomic::AtomicU64; |
| 19 | +use std::sync::atomic::Ordering; |
| 20 | + |
17 | 21 | use rocketmq_common::common::pop_ack_constants::PopAckConstants; |
| 22 | +use rocketmq_common::TimeUtils::get_current_millis; |
18 | 23 | use rocketmq_remoting::code::request_code::RequestCode; |
19 | 24 | use rocketmq_remoting::net::channel::Channel; |
20 | 25 | use rocketmq_remoting::protocol::remoting_command::RemotingCommand; |
@@ -97,6 +102,46 @@ impl PopMessageProcessor { |
97 | 102 | } |
98 | 103 | } |
99 | 104 |
|
| 105 | +struct TimedLock { |
| 106 | + lock: AtomicBool, |
| 107 | + lock_time: AtomicU64, |
| 108 | +} |
| 109 | + |
| 110 | +impl TimedLock { |
| 111 | + pub fn new() -> Self { |
| 112 | + TimedLock { |
| 113 | + lock: AtomicBool::new(false), |
| 114 | + lock_time: AtomicU64::new(get_current_millis()), |
| 115 | + } |
| 116 | + } |
| 117 | + |
| 118 | + pub fn try_lock(&self) -> bool { |
| 119 | + match self |
| 120 | + .lock |
| 121 | + .compare_exchange(false, true, Ordering::Acquire, Ordering::Relaxed) |
| 122 | + { |
| 123 | + Ok(_) => { |
| 124 | + self.lock_time |
| 125 | + .store(get_current_millis(), Ordering::Relaxed); |
| 126 | + true |
| 127 | + } |
| 128 | + Err(_) => false, |
| 129 | + } |
| 130 | + } |
| 131 | + |
| 132 | + pub fn unlock(&self) { |
| 133 | + self.lock.store(false, Ordering::Release); |
| 134 | + } |
| 135 | + |
| 136 | + pub fn is_locked(&self) -> bool { |
| 137 | + self.lock.load(Ordering::Acquire) |
| 138 | + } |
| 139 | + |
| 140 | + pub fn get_lock_time(&self) -> u64 { |
| 141 | + self.lock_time.load(Ordering::Relaxed) |
| 142 | + } |
| 143 | +} |
| 144 | + |
100 | 145 | #[cfg(test)] |
101 | 146 | mod tests { |
102 | 147 | use cheetah_string::CheetahString; |
@@ -159,4 +204,41 @@ mod tests { |
159 | 204 | let expected = "test_topic@1@456@test_cid@789@test_broker@ck"; |
160 | 205 | assert_eq!(result, expected); |
161 | 206 | } |
| 207 | + |
| 208 | + #[test] |
| 209 | + fn new_timed_lock_is_unlocked() { |
| 210 | + let lock = TimedLock::new(); |
| 211 | + assert!(!lock.is_locked()); |
| 212 | + } |
| 213 | + |
| 214 | + #[test] |
| 215 | + fn try_lock_locks_successfully() { |
| 216 | + let lock = TimedLock::new(); |
| 217 | + assert!(lock.try_lock()); |
| 218 | + assert!(lock.is_locked()); |
| 219 | + } |
| 220 | + |
| 221 | + #[test] |
| 222 | + fn try_lock_fails_when_already_locked() { |
| 223 | + let lock = TimedLock::new(); |
| 224 | + lock.try_lock(); |
| 225 | + assert!(!lock.try_lock()); |
| 226 | + } |
| 227 | + |
| 228 | + #[test] |
| 229 | + fn unlock_unlocks_successfully() { |
| 230 | + let lock = TimedLock::new(); |
| 231 | + lock.try_lock(); |
| 232 | + lock.unlock(); |
| 233 | + assert!(!lock.is_locked()); |
| 234 | + } |
| 235 | + |
| 236 | + #[test] |
| 237 | + fn get_lock_time_returns_correct_time() { |
| 238 | + let lock = TimedLock::new(); |
| 239 | + let initial_time = lock.get_lock_time(); |
| 240 | + lock.try_lock(); |
| 241 | + let lock_time = lock.get_lock_time(); |
| 242 | + assert!(lock_time >= initial_time); |
| 243 | + } |
162 | 244 | } |
0 commit comments