Skip to content

Commit c64b65a

Browse files
committed
[ISSUE #793]🔥Implement SendMessageProcessor#handleRetryAndDLQ ✨
1 parent 93135eb commit c64b65a

9 files changed

Lines changed: 728 additions & 322 deletions

File tree

Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

rocketmq-broker/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,7 @@ local-ip-address = "0.6.1"
5858
dns-lookup = "2.0"
5959
log = "0.4.22"
6060
cfg-if = { workspace = true }
61+
lazy_static.workspace = true
6162
[dev-dependencies]
6263
mockall = "0.12.1"
6364
static_assertions = { version = "1" }

rocketmq-broker/src/broker_runtime.rs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,7 @@ use crate::broker::broker_hook::BrokerShutdownHook;
5151
use crate::client::default_consumer_ids_change_listener::DefaultConsumerIdsChangeListener;
5252
use crate::client::manager::consumer_manager::ConsumerManager;
5353
use crate::client::manager::producer_manager::ProducerManager;
54+
use crate::client::rebalance::rebalance_lock_manager::RebalanceLockManager;
5455
use crate::filter::manager::consumer_filter_manager::ConsumerFilterManager;
5556
use crate::hook::batch_check_before_put_message::BatchCheckBeforePutMessageHook;
5657
use crate::hook::check_before_put_message::CheckBeforePutMessageHook;
@@ -110,6 +111,7 @@ pub(crate) struct BrokerRuntime {
110111
is_isolated: Arc<AtomicBool>,
111112
#[cfg(feature = "local_file_store")]
112113
pull_request_hold_service: Option<PullRequestHoldService<DefaultMessageStore>>,
114+
rebalance_lock_manager: Arc<RebalanceLockManager>,
113115
}
114116

115117
impl Clone for BrokerRuntime {
@@ -142,6 +144,7 @@ impl Clone for BrokerRuntime {
142144
should_start_time: self.should_start_time.clone(),
143145
is_isolated: self.is_isolated.clone(),
144146
pull_request_hold_service: self.pull_request_hold_service.clone(),
147+
rebalance_lock_manager: self.rebalance_lock_manager.clone(),
145148
}
146149
}
147150
}
@@ -216,6 +219,7 @@ impl BrokerRuntime {
216219
should_start_time: Arc::new(AtomicU64::new(0)),
217220
is_isolated: Arc::new(AtomicBool::new(false)),
218221
pull_request_hold_service: None,
222+
rebalance_lock_manager: Arc::new(Default::default()),
219223
}
220224
}
221225

@@ -372,9 +376,11 @@ impl BrokerRuntime {
372376
fn init_processor(&mut self) -> BrokerRequestProcessor<DefaultMessageStore> {
373377
let send_message_processor = SendMessageProcessor::<DefaultMessageStore>::new(
374378
self.topic_queue_mapping_manager.clone(),
379+
self.subscription_group_manager.clone(),
375380
self.topic_config_manager.clone(),
376381
self.broker_config.clone(),
377382
self.message_store.as_ref().unwrap(),
383+
self.rebalance_lock_manager.clone(),
378384
);
379385
let mut pull_message_result_handler =
380386
ArcCellWrapper::new(Box::new(DefaultPullMessageResultHandler::new(

rocketmq-broker/src/client.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,3 +22,4 @@ pub(crate) mod consumer_ids_change_listener;
2222
pub(crate) mod default_consumer_ids_change_listener;
2323
pub(crate) mod manager;
2424
pub(crate) mod net;
25+
pub(crate) mod rebalance;
Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
pub mod rebalance_lock_manager;
Lines changed: 289 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,289 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
use std::collections::HashMap;
18+
use std::sync::atomic::AtomicI64;
19+
use std::sync::Arc;
20+
21+
use lazy_static::lazy_static;
22+
use parking_lot::RwLock;
23+
use rocketmq_common::common::message::message_queue::MessageQueue;
24+
use rocketmq_common::TimeUtils::get_current_millis;
25+
use tracing::info;
26+
use tracing::warn;
27+
28+
lazy_static! {
29+
pub static ref REBALANCE_LOCK_MAX_LIVE_TIME: i64 = {
30+
std::env::var("rocketmq.broker.rebalance.lockMaxLiveTime")
31+
.unwrap_or("60000".to_string())
32+
.parse::<i64>()
33+
.unwrap_or(60000)
34+
};
35+
}
36+
37+
type MessageQueueLockTable = HashMap<String, HashMap<Arc<MessageQueue>, LockEntry>>;
38+
39+
#[derive(Clone, Default)]
40+
pub struct RebalanceLockManager {
41+
mq_lock_table: Arc<RwLock<MessageQueueLockTable>>,
42+
}
43+
44+
impl RebalanceLockManager {
45+
pub fn is_lock_all_expired(&self, group: &str) -> bool {
46+
let lock_table = self.mq_lock_table.read();
47+
let lock_entry = lock_table.get(group);
48+
if lock_entry.is_none() {
49+
return true;
50+
}
51+
let lock_entry = lock_entry.unwrap();
52+
for (_, entry) in lock_entry.iter() {
53+
if !entry.is_expired() {
54+
return false;
55+
}
56+
}
57+
true
58+
}
59+
60+
pub fn try_lock_batch(
61+
&self,
62+
group: &str,
63+
mqs: Vec<Arc<MessageQueue>>,
64+
client_id: &str,
65+
) -> Vec<Arc<MessageQueue>> {
66+
let mut lock_mqs = Vec::new();
67+
let mut not_locked_mqs = Vec::new();
68+
for mq in mqs.iter() {
69+
if self.is_locked(group, mq, client_id) {
70+
lock_mqs.push(mq.clone());
71+
} else {
72+
not_locked_mqs.push(mq.clone());
73+
}
74+
}
75+
if !not_locked_mqs.is_empty() {
76+
let mut write_guard = self.mq_lock_table.write();
77+
let mut group_value = write_guard.get_mut(group);
78+
if group_value.is_none() {
79+
group_value = Some(write_guard.entry(group.to_string()).or_default());
80+
}
81+
let group_value = group_value.unwrap();
82+
for mq in not_locked_mqs.iter() {
83+
let lock_entry = group_value.entry(mq.clone()).or_insert_with(|| {
84+
info!(
85+
"RebalanceLockManager#tryLockBatch: lock a message which has not been \
86+
locked yet, group={}, clientId={}, mq={:?}",
87+
group, client_id, mq
88+
);
89+
LockEntry {
90+
client_id: client_id.to_string(),
91+
last_update_timestamp: AtomicI64::new(get_current_millis() as i64),
92+
}
93+
});
94+
if lock_entry.is_locked(client_id) {
95+
lock_entry.last_update_timestamp.store(
96+
get_current_millis() as i64,
97+
std::sync::atomic::Ordering::Relaxed,
98+
);
99+
lock_mqs.push(mq.clone());
100+
continue;
101+
}
102+
let old_client_id = lock_entry.client_id.as_str().to_string();
103+
if lock_entry.is_expired() {
104+
lock_entry.client_id = client_id.to_string();
105+
lock_entry.last_update_timestamp.store(
106+
get_current_millis() as i64,
107+
std::sync::atomic::Ordering::Relaxed,
108+
);
109+
lock_mqs.push(mq.clone());
110+
warn!(
111+
"RebalanceLockManager#tryLockBatch: try to lock a expired message queue, \
112+
group={} mq={:?}, old client id={}, new client id={}",
113+
group, mq, old_client_id, client_id
114+
);
115+
continue;
116+
}
117+
warn!(
118+
"RebalanceLockManager#tryLockBatch: message queue has been locked by other \
119+
group={}, mq={:?}, locked client id={}, current client id={}",
120+
group, mq, old_client_id, client_id
121+
);
122+
}
123+
}
124+
lock_mqs
125+
}
126+
127+
pub fn unlock_batch(&self, group: &str, mqs: Vec<Arc<MessageQueue>>, client_id: &str) {
128+
let mut write_guard = self.mq_lock_table.write();
129+
let group_value = write_guard.get_mut(group);
130+
if group_value.is_none() {
131+
warn!(
132+
"RebalanceLockManager#unlockBatch: group not exist, group={}, clientId={}, \
133+
mqs={:?}",
134+
group, client_id, mqs
135+
);
136+
return;
137+
}
138+
let group_value = group_value.unwrap();
139+
for mq in mqs.iter() {
140+
let lock_entry = group_value.get(mq);
141+
if lock_entry.is_none() {
142+
warn!(
143+
"RebalanceLockManager#unlockBatch: mq not locked, group={}, clientId={}, mq={}",
144+
group, client_id, mq
145+
);
146+
continue;
147+
}
148+
let lock_entry = lock_entry.unwrap();
149+
if lock_entry.client_id == *client_id {
150+
group_value.remove(mq);
151+
info!(
152+
"RebalanceLockManager#unlockBatch: unlock a message queue, group={}, \
153+
clientId={}, mq={:?}",
154+
group, client_id, mq
155+
);
156+
} else {
157+
warn!(
158+
"RebalanceLockManager#unlockBatch: unlock a message queue, but the client id \
159+
is not matched, group={}, clientId={}, mq={:?}",
160+
group, client_id, mq
161+
);
162+
}
163+
}
164+
}
165+
166+
fn is_locked(&self, group: &str, mq: &Arc<MessageQueue>, client_id: &str) -> bool {
167+
let lock_table = self.mq_lock_table.read();
168+
let group_value = lock_table.get(group);
169+
if group_value.is_none() {
170+
return false;
171+
}
172+
let group_value = group_value.unwrap();
173+
let lock_entry = group_value.get(mq);
174+
if lock_entry.is_none() {
175+
return false;
176+
}
177+
let lock_entry = lock_entry.unwrap();
178+
let locked = lock_entry.is_locked(client_id);
179+
if locked {
180+
lock_entry.last_update_timestamp.store(
181+
get_current_millis() as i64,
182+
std::sync::atomic::Ordering::Relaxed,
183+
);
184+
}
185+
locked
186+
}
187+
}
188+
189+
struct LockEntry {
190+
client_id: String,
191+
last_update_timestamp: AtomicI64,
192+
}
193+
194+
impl LockEntry {
195+
pub fn new() -> LockEntry {
196+
Self {
197+
client_id: "".to_string(),
198+
last_update_timestamp: AtomicI64::new(get_current_millis() as i64),
199+
}
200+
}
201+
202+
#[inline]
203+
pub fn is_expired(&self) -> bool {
204+
let now = get_current_millis() as i64;
205+
let last_update_timestamp = self
206+
.last_update_timestamp
207+
.load(std::sync::atomic::Ordering::Relaxed);
208+
(now - last_update_timestamp) > *REBALANCE_LOCK_MAX_LIVE_TIME
209+
}
210+
211+
#[inline]
212+
pub fn is_locked(&self, client_id: &str) -> bool {
213+
self.client_id == client_id && !self.is_expired()
214+
}
215+
}
216+
217+
#[cfg(test)]
218+
mod rebalance_lock_manager_tests {
219+
use std::sync::Arc;
220+
221+
use rocketmq_common::common::message::message_queue::MessageQueue;
222+
223+
use super::*;
224+
225+
#[test]
226+
fn lock_all_expired_returns_true_when_no_locks_exist() {
227+
let manager = RebalanceLockManager::default();
228+
assert!(manager.is_lock_all_expired("test_group"));
229+
}
230+
231+
#[test]
232+
fn lock_all_expired_returns_false_when_active_locks_exist() {
233+
let manager = RebalanceLockManager::default();
234+
let mq = Arc::new(MessageQueue::default());
235+
manager.try_lock_batch("test_group", vec![mq.clone()], "client_1");
236+
assert!(!manager.is_lock_all_expired("test_group"));
237+
}
238+
239+
#[test]
240+
fn try_lock_batch_locks_message_queues_for_new_group() {
241+
let manager = RebalanceLockManager::default();
242+
let mq = Arc::new(MessageQueue::default());
243+
let locked_mqs = manager.try_lock_batch("test_group", vec![mq.clone()], "client_1");
244+
assert_eq!(locked_mqs.len(), 1);
245+
}
246+
247+
#[test]
248+
fn try_lock_batch_does_not_lock_already_locked_message_queues() {
249+
let manager = RebalanceLockManager::default();
250+
let mq = Arc::new(MessageQueue::default());
251+
manager.try_lock_batch("test_group", vec![mq.clone()], "client_1");
252+
let locked_mqs = manager.try_lock_batch("test_group", vec![mq.clone()], "client_2");
253+
assert!(locked_mqs.is_empty());
254+
}
255+
256+
#[test]
257+
fn unlock_batch_unlocks_message_queues_locked_by_client() {
258+
let manager = RebalanceLockManager::default();
259+
let mq = Arc::new(MessageQueue::default());
260+
manager.try_lock_batch("test_group", vec![mq.clone()], "client_1");
261+
manager.unlock_batch("test_group", vec![mq.clone()], "client_1");
262+
let locked_mqs = manager.try_lock_batch("test_group", vec![mq.clone()], "client_2");
263+
assert_eq!(locked_mqs.len(), 1);
264+
}
265+
266+
#[test]
267+
fn unlock_batch_does_not_unlock_message_queues_locked_by_other_clients() {
268+
let manager = RebalanceLockManager::default();
269+
let mq = Arc::new(MessageQueue::default());
270+
manager.try_lock_batch("test_group", vec![mq.clone()], "client_1");
271+
manager.unlock_batch("test_group", vec![mq.clone()], "client_2");
272+
assert!(!manager.is_lock_all_expired("test_group"));
273+
}
274+
275+
#[test]
276+
fn is_locked_returns_true_for_locked_message_queue() {
277+
let manager = RebalanceLockManager::default();
278+
let mq = Arc::new(MessageQueue::default());
279+
manager.try_lock_batch("test_group", vec![mq.clone()], "client_1");
280+
assert!(manager.is_locked("test_group", &mq, "client_1"));
281+
}
282+
283+
#[test]
284+
fn is_locked_returns_false_for_unlocked_message_queue() {
285+
let manager = RebalanceLockManager::default();
286+
let mq = Arc::new(MessageQueue::default());
287+
assert!(!manager.is_locked("test_group", &mq, "client_1"));
288+
}
289+
}

0 commit comments

Comments
 (0)