Skip to content

Commit eddc45d

Browse files
authored
[ISSUE #1735]🚀Implement TimedLock function (#1736)
1 parent 266803e commit eddc45d

1 file changed

Lines changed: 82 additions & 0 deletions

File tree

rocketmq-broker/src/processor/pop_message_processor.rs

Lines changed: 82 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,12 @@
1414
* See the License for the specific language governing permissions and
1515
* limitations under the License.
1616
*/
17+
use std::sync::atomic::AtomicBool;
18+
use std::sync::atomic::AtomicU64;
19+
use std::sync::atomic::Ordering;
20+
1721
use rocketmq_common::common::pop_ack_constants::PopAckConstants;
22+
use rocketmq_common::TimeUtils::get_current_millis;
1823
use rocketmq_remoting::code::request_code::RequestCode;
1924
use rocketmq_remoting::net::channel::Channel;
2025
use rocketmq_remoting::protocol::remoting_command::RemotingCommand;
@@ -97,6 +102,46 @@ impl PopMessageProcessor {
97102
}
98103
}
99104

105+
struct TimedLock {
106+
lock: AtomicBool,
107+
lock_time: AtomicU64,
108+
}
109+
110+
impl TimedLock {
111+
pub fn new() -> Self {
112+
TimedLock {
113+
lock: AtomicBool::new(false),
114+
lock_time: AtomicU64::new(get_current_millis()),
115+
}
116+
}
117+
118+
pub fn try_lock(&self) -> bool {
119+
match self
120+
.lock
121+
.compare_exchange(false, true, Ordering::Acquire, Ordering::Relaxed)
122+
{
123+
Ok(_) => {
124+
self.lock_time
125+
.store(get_current_millis(), Ordering::Relaxed);
126+
true
127+
}
128+
Err(_) => false,
129+
}
130+
}
131+
132+
pub fn unlock(&self) {
133+
self.lock.store(false, Ordering::Release);
134+
}
135+
136+
pub fn is_locked(&self) -> bool {
137+
self.lock.load(Ordering::Acquire)
138+
}
139+
140+
pub fn get_lock_time(&self) -> u64 {
141+
self.lock_time.load(Ordering::Relaxed)
142+
}
143+
}
144+
100145
#[cfg(test)]
101146
mod tests {
102147
use cheetah_string::CheetahString;
@@ -159,4 +204,41 @@ mod tests {
159204
let expected = "test_topic@1@456@test_cid@789@test_broker@ck";
160205
assert_eq!(result, expected);
161206
}
207+
208+
#[test]
209+
fn new_timed_lock_is_unlocked() {
210+
let lock = TimedLock::new();
211+
assert!(!lock.is_locked());
212+
}
213+
214+
#[test]
215+
fn try_lock_locks_successfully() {
216+
let lock = TimedLock::new();
217+
assert!(lock.try_lock());
218+
assert!(lock.is_locked());
219+
}
220+
221+
#[test]
222+
fn try_lock_fails_when_already_locked() {
223+
let lock = TimedLock::new();
224+
lock.try_lock();
225+
assert!(!lock.try_lock());
226+
}
227+
228+
#[test]
229+
fn unlock_unlocks_successfully() {
230+
let lock = TimedLock::new();
231+
lock.try_lock();
232+
lock.unlock();
233+
assert!(!lock.is_locked());
234+
}
235+
236+
#[test]
237+
fn get_lock_time_returns_correct_time() {
238+
let lock = TimedLock::new();
239+
let initial_time = lock.get_lock_time();
240+
lock.try_lock();
241+
let lock_time = lock.get_lock_time();
242+
assert!(lock_time >= initial_time);
243+
}
162244
}

0 commit comments

Comments
 (0)