Skip to content

Commit 6561ec9

Browse files
committed
optimize code
1 parent 1c7dbe0 commit 6561ec9

8 files changed

Lines changed: 109 additions & 12 deletions

File tree

Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -76,4 +76,7 @@ cfg-if = "1.0.0"
7676
sysinfo = "0.31.4"
7777
uuid = { version = "1.10.0", features = ["v4", # Lets you generate random UUIDs
7878
"fast-rng", # Use a faster (but still sufficiently random) RNG
79-
"macro-diagnostics", ] }
79+
"macro-diagnostics", ] }
80+
81+
82+
futures = "0.3"

rocketmq-client/Cargo.toml

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,9 @@ regex = { version = "1.10.6", features = [] }
3939
parking_lot = { workspace = true }
4040
once_cell = { workspace = true }
4141
bytes = { workspace = true }
42+
43+
44+
futures = { workspace = true }
4245
[[example]]
4346
name = "simple-producer"
4447
path = "examples/producer/simple_producer.rs"

rocketmq-client/src/consumer/consumer_impl/re_balance.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,7 @@ pub trait RebalanceLocal {
5555

5656
fn consume_type(&self) -> ConsumeType;
5757

58-
async fn remove_dirty_offset(&self, mq: &MessageQueue);
58+
async fn remove_dirty_offset(&mut self, mq: &MessageQueue);
5959

6060
async fn compute_pull_from_where_with_exception(&mut self, mq: &MessageQueue) -> Result<i64>;
6161

rocketmq-client/src/consumer/consumer_impl/re_balance/rebalance_impl.rs

Lines changed: 73 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -468,7 +468,77 @@ where
468468

469469
pub async fn lock_all(&mut self) {
470470
let broker_mqs = self.build_process_queue_table_by_broker_name().await;
471-
for (broker_name, mqs) in broker_mqs {
471+
472+
let map = broker_mqs
473+
.into_iter()
474+
.map(|(broker_name, mqs)| {
475+
let mut client_instance = self.client_instance.clone();
476+
let process_queue_table = self.process_queue_table.clone();
477+
let consumer_group = self.consumer_group.clone().unwrap();
478+
async move {
479+
if mqs.is_empty() {
480+
return;
481+
}
482+
let client = client_instance.as_mut().unwrap();
483+
let find_broker_result = client
484+
.find_broker_address_in_subscribe(
485+
broker_name.as_str(),
486+
mix_all::MASTER_ID,
487+
true,
488+
)
489+
.await;
490+
if let Some(find_broker_result) = find_broker_result {
491+
let request_body = LockBatchRequestBody {
492+
consumer_group: Some(consumer_group.to_owned()),
493+
client_id: Some(client.client_id.clone()),
494+
mq_set: mqs.clone(),
495+
..Default::default()
496+
};
497+
let result = client
498+
.mq_client_api_impl
499+
.as_mut()
500+
.unwrap()
501+
.lock_batch_mq(
502+
find_broker_result.broker_addr.as_str(),
503+
request_body,
504+
1_000,
505+
)
506+
.await;
507+
match result {
508+
Ok(lock_okmqset) => {
509+
let process_queue_table = process_queue_table.read().await;
510+
for mq in &mqs {
511+
if let Some(pq) = process_queue_table.get(mq) {
512+
if lock_okmqset.contains(mq) {
513+
if pq.is_locked() {
514+
info!(
515+
"the message queue locked OK, Group: {:?} {}",
516+
consumer_group, mq
517+
);
518+
}
519+
pq.set_locked(true);
520+
pq.set_last_lock_timestamp(get_current_millis());
521+
} else {
522+
pq.set_locked(false);
523+
warn!(
524+
"the message queue locked Failed, Group: {:?} {}",
525+
consumer_group, mq
526+
);
527+
}
528+
}
529+
}
530+
}
531+
Err(e) => {
532+
error!("lockBatchMQ exception {}", e);
533+
}
534+
}
535+
}
536+
}
537+
})
538+
.collect::<Vec<_>>();
539+
futures::future::join_all(map).await;
540+
541+
/* for (broker_name, mqs) in broker_mqs {
472542
if mqs.is_empty() {
473543
continue;
474544
}
@@ -518,7 +588,7 @@ where
518588
}
519589
}
520590
}
521-
}
591+
}*/
522592
}
523593

524594
async fn build_process_queue_table_by_broker_name(
@@ -533,7 +603,7 @@ where
533603
}
534604
let broker_name = client.get_broker_name_from_message_queue(mq).await;
535605
let entry = result.entry(broker_name).or_insert(HashSet::new());
536-
entry.insert(mq.clone());
606+
entry.insert(mq.to_owned());
537607
}
538608
result
539609
}

rocketmq-client/src/consumer/consumer_impl/re_balance/rebalance_push_impl.rs

Lines changed: 15 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -151,12 +151,18 @@ impl RebalancePushImpl {
151151
return true;
152152
} else {
153153
pq.inc_try_unlock_times();
154+
warn!(
155+
"Failed to acquire consume_lock for {}, incrementing try_unlock_times.",
156+
mq
157+
);
154158
}
155159
}
156160
false
157161
}
158162
}
159163

164+
const UNLOCK_BATCH_MQ_TIMEOUT_MS: u64 = 1_000;
165+
160166
impl Rebalance for RebalancePushImpl {
161167
async fn message_queue_changed(
162168
&mut self,
@@ -244,7 +250,7 @@ impl Rebalance for RebalancePushImpl {
244250
ConsumeType::ConsumePassively
245251
}
246252

247-
async fn remove_dirty_offset(&self, mq: &MessageQueue) {
253+
async fn remove_dirty_offset(&mut self, mq: &MessageQueue) {
248254
if let Some(mut default_mqpush_consumer_impl) = self
249255
.default_mqpush_consumer_impl
250256
.as_ref()
@@ -441,7 +447,13 @@ impl Rebalance for RebalancePushImpl {
441447
}
442448

443449
async fn unlock(&mut self, mq: &MessageQueue, oneway: bool) {
444-
let client = self.rebalance_impl_inner.client_instance.as_mut().unwrap();
450+
let client = match self.rebalance_impl_inner.client_instance.as_mut() {
451+
Some(client) => client,
452+
None => {
453+
warn!("Client instance is not available.");
454+
return;
455+
}
456+
};
445457
let broker_name = client.get_broker_name_from_message_queue(mq).await;
446458
let find_broker_result = client
447459
.find_broker_address_in_subscribe(broker_name.as_str(), mix_all::MASTER_ID, true)
@@ -460,7 +472,7 @@ impl Rebalance for RebalancePushImpl {
460472
.unlock_batch_mq(
461473
find_broker_result.broker_addr.as_str(),
462474
request_body,
463-
1_000,
475+
UNLOCK_BATCH_MQ_TIMEOUT_MS,
464476
oneway,
465477
)
466478
.await;

rocketmq-client/src/implementation/mq_client_api_impl.rs

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1106,9 +1106,17 @@ impl MQClientAPIImpl {
11061106
)
11071107
.await?;
11081108
if ResponseCode::from(response.code()) == ResponseCode::Success {
1109-
LockBatchResponseBody::decode(response.body().as_ref().unwrap().as_ref())
1110-
.map(|body| body.lock_ok_mq_set)
1111-
.map_err(|e| MQBrokerError(response.code(), e.to_string(), addr.to_string()))
1109+
if let Some(body) = response.body() {
1110+
LockBatchResponseBody::decode(body.as_ref())
1111+
.map(|body| body.lock_ok_mq_set)
1112+
.map_err(|e| MQBrokerError(response.code(), e.to_string(), addr.to_string()))
1113+
} else {
1114+
Err(MQBrokerError(
1115+
response.code(),
1116+
"Response body is empty".to_string(),
1117+
addr.to_string(),
1118+
))
1119+
}
11121120
} else {
11131121
Err(MQBrokerError(
11141122
response.code(),

rocketmq-remoting/src/protocol/body/request/lock_batch_request_body.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ impl Display for LockBatchRequestBody {
3434
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
3535
write!(
3636
f,
37-
"UnlockBatchRequestBody [consumer_group={}, client_id={}, only_this_broker={}, \
37+
"LockBatchRequestBody [consumer_group={}, client_id={}, only_this_broker={}, \
3838
mq_set={:?}]",
3939
self.consumer_group.as_ref().unwrap_or(&"".to_string()),
4040
self.client_id.as_ref().unwrap_or(&"".to_string()),

0 commit comments

Comments
 (0)