Skip to content

Commit f96d323

Browse files
authored
[ISSUE #981]šŸ”„Optimize client clusting consumeāš”ļø (#985)
1 parent ba55978 commit f96d323

17 files changed

Lines changed: 267 additions & 77 deletions

ā€Žrocketmq-client/examples/quickstart/consumer.rsā€Ž

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ pub const TAG: &str = "*";
3232
#[rocketmq::main]
3333
pub async fn main() -> Result<()> {
3434
//init logger
35-
rocketmq_common::log::init_logger();
35+
//rocketmq_common::log::init_logger();
3636

3737
// create a producer builder with default configuration
3838
let builder = DefaultMQPushConsumer::builder();

ā€Žrocketmq-client/src/consumer/consumer_impl/consume_message_concurrently_service.rsā€Ž

Lines changed: 15 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,6 @@ use rocketmq_common::TimeUtils::get_current_millis;
3030
use rocketmq_common::WeakCellWrapper;
3131
use rocketmq_remoting::protocol::body::consume_message_directly_result::ConsumeMessageDirectlyResult;
3232
use rocketmq_remoting::protocol::heartbeat::message_model::MessageModel;
33-
use rocketmq_runtime::RocketMQRuntime;
3433
use tracing::info;
3534
use tracing::warn;
3635

@@ -53,7 +52,7 @@ pub struct ConsumeMessageConcurrentlyService {
5352
pub(crate) consumer_config: ArcRefCellWrapper<ConsumerConfig>,
5453
pub(crate) consumer_group: Arc<String>,
5554
pub(crate) message_listener: ArcBoxMessageListenerConcurrently,
56-
pub(crate) consume_runtime: Arc<RocketMQRuntime>,
55+
// pub(crate) consume_runtime: Arc<RocketMQRuntime>,
5756
}
5857

5958
impl ConsumeMessageConcurrentlyService {
@@ -71,10 +70,10 @@ impl ConsumeMessageConcurrentlyService {
7170
consumer_config,
7271
consumer_group: Arc::new(consumer_group),
7372
message_listener,
74-
consume_runtime: Arc::new(RocketMQRuntime::new_multi(
73+
/*consume_runtime: Arc::new(RocketMQRuntime::new_multi(
7574
consume_thread as usize,
7675
"ConsumeMessageThread_",
77-
)),
76+
)),*/
7877
}
7978
}
8079
}
@@ -140,12 +139,6 @@ impl ConsumeMessageConcurrentlyService {
140139
}
141140
if !msg_back_failed.is_empty() {
142141
consume_request.msgs.append(&mut msg_back_success);
143-
/* let msg_back_failed_switched = msg_back_failed
144-
.into_iter()
145-
.map(|msg| MessageClientExt {
146-
message_ext_inner: msg,
147-
})
148-
.collect();*/
149142
self.submit_consume_request_later(
150143
msg_back_failed,
151144
consume_request.process_queue.clone(),
@@ -182,7 +175,7 @@ impl ConsumeMessageConcurrentlyService {
182175
message_queue: MessageQueue,
183176
) {
184177
let this = self.clone();
185-
self.consume_runtime.get_handle().spawn(async move {
178+
tokio::spawn(async move {
186179
tokio::time::sleep(Duration::from_secs(5)).await;
187180
this.submit_consume_request(msgs, process_queue, message_queue, true)
188181
.await;
@@ -231,8 +224,8 @@ impl ConsumeMessageServiceTrait for ConsumeMessageConcurrentlyService {
231224
});
232225
}
233226

234-
fn shutdown(&self, await_terminate_millis: u64) {
235-
todo!()
227+
fn shutdown(&mut self, await_terminate_millis: u64) {
228+
// todo!()
236229
}
237230

238231
fn update_core_pool_size(&self, core_pool_size: usize) {
@@ -267,7 +260,7 @@ impl ConsumeMessageServiceTrait for ConsumeMessageConcurrentlyService {
267260
dispatch_to_consume: bool,
268261
) {
269262
let consume_batch_size = self.consumer_config.consume_message_batch_max_size;
270-
if msgs.len() < consume_batch_size as usize {
263+
if msgs.len() <= consume_batch_size as usize {
271264
let mut consume_request = ConsumeRequest {
272265
msgs: msgs.clone(),
273266
message_listener: self.message_listener.clone(),
@@ -278,7 +271,8 @@ impl ConsumeMessageServiceTrait for ConsumeMessageConcurrentlyService {
278271
default_mqpush_consumer_impl: self.default_mqpush_consumer_impl.clone(),
279272
};
280273
let consume_message_concurrently_service = self.clone();
281-
self.consume_runtime.get_handle().spawn(async move {
274+
275+
tokio::spawn(async move {
282276
consume_request
283277
.run(consume_message_concurrently_service)
284278
.await
@@ -301,7 +295,12 @@ impl ConsumeMessageServiceTrait for ConsumeMessageConcurrentlyService {
301295
default_mqpush_consumer_impl: self.default_mqpush_consumer_impl.clone(),
302296
};
303297
let consume_message_concurrently_service = self.clone();
304-
self.consume_runtime.get_handle().spawn(async move {
298+
/* self.consume_runtime.get_handle().spawn(async move {
299+
consume_request
300+
.run(consume_message_concurrently_service)
301+
.await
302+
});*/
303+
tokio::spawn(async move {
305304
consume_request
306305
.run(consume_message_concurrently_service)
307306
.await

ā€Žrocketmq-client/src/consumer/consumer_impl/consume_message_orderly_service.rsā€Ž

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -33,8 +33,8 @@ impl ConsumeMessageServiceTrait for ConsumeMessageOrderlyService {
3333
todo!()
3434
}
3535

36-
fn shutdown(&self, await_terminate_millis: u64) {
37-
todo!()
36+
fn shutdown(&mut self, await_terminate_millis: u64) {
37+
unimplemented!("shutdown")
3838
}
3939

4040
fn update_core_pool_size(&self, core_pool_size: usize) {

ā€Žrocketmq-client/src/consumer/consumer_impl/consume_message_pop_concurrently_service.rsā€Ž

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,7 @@ impl ConsumeMessageServiceTrait for ConsumeMessagePopConcurrentlyService {
6161
// nothing to do
6262
}
6363

64-
fn shutdown(&self, await_terminate_millis: u64) {
64+
fn shutdown(&mut self, await_terminate_millis: u64) {
6565
todo!()
6666
}
6767

ā€Žrocketmq-client/src/consumer/consumer_impl/consume_message_pop_orderly_service.rsā€Ž

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ impl ConsumeMessageServiceTrait for ConsumeMessagePopOrderlyService {
3333
todo!()
3434
}
3535

36-
fn shutdown(&self, await_terminate_millis: u64) {
36+
fn shutdown(&mut self, await_terminate_millis: u64) {
3737
todo!()
3838
}
3939

ā€Žrocketmq-client/src/consumer/consumer_impl/consume_message_service.rsā€Ž

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,7 @@ where
4545
pub trait ConsumeMessageServiceTrait {
4646
fn start(&mut self);
4747

48-
fn shutdown(&self, await_terminate_millis: u64);
48+
fn shutdown(&mut self, await_terminate_millis: u64);
4949

5050
fn update_core_pool_size(&self, core_pool_size: usize);
5151

ā€Žrocketmq-client/src/consumer/consumer_impl/default_mq_push_consumer_impl.rsā€Ž

Lines changed: 70 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@ use rocketmq_remoting::protocol::heartbeat::subscription_data::SubscriptionData;
4747
use rocketmq_remoting::protocol::namespace_util::NamespaceUtil;
4848
use rocketmq_remoting::runtime::RPCHook;
4949
use tokio::runtime::Handle;
50+
use tokio::sync::Mutex;
5051
use tracing::error;
5152
use tracing::info;
5253
use tracing::warn;
@@ -96,6 +97,7 @@ const _1MB: u64 = 1024 * 1024;
9697

9798
#[derive(Clone)]
9899
pub struct DefaultMQPushConsumerImpl {
100+
pub(crate) global_lock: Arc<Mutex<()>>,
99101
pub(crate) pull_time_delay_mills_when_exception: u64,
100102
pub(crate) client_config: ArcRefCellWrapper<ClientConfig>,
101103
pub(crate) consumer_config: ArcRefCellWrapper<ConsumerConfig>,
@@ -139,6 +141,7 @@ impl DefaultMQPushConsumerImpl {
139141
rpc_hook: Option<Arc<Box<dyn RPCHook>>>,
140142
) -> Self {
141143
let mut this = Self {
144+
global_lock: Arc::new(Default::default()),
142145
pull_time_delay_mills_when_exception: 3_000,
143146
client_config: ArcRefCellWrapper::new(client_config.clone()),
144147
consumer_config: consumer_config.clone(),
@@ -372,6 +375,52 @@ impl DefaultMQPushConsumerImpl {
372375
Ok(())
373376
}
374377

378+
pub async fn shutdown(&mut self, await_terminate_millis: u64) {
379+
let _lock = self.global_lock.lock().await;
380+
match *self.service_state {
381+
ServiceState::CreateJust => {
382+
warn!(
383+
"the consumer [{}] do not start, so do nothing",
384+
self.consumer_config.consumer_group
385+
);
386+
}
387+
ServiceState::Running => {
388+
if let Some(consume_message_concurrently_service) =
389+
self.consume_message_concurrently_service.as_mut()
390+
{
391+
consume_message_concurrently_service
392+
.consume_message_concurrently_service
393+
.shutdown(await_terminate_millis);
394+
}
395+
self.persist_consumer_offset().await;
396+
let client = self.client_instance.as_mut().unwrap();
397+
client
398+
.unregister_consumer(self.consumer_config.consumer_group.as_str())
399+
.await;
400+
client.shutdown().await;
401+
info!(
402+
"the consumer [{}] shutdown OK",
403+
self.consumer_config.consumer_group.as_str()
404+
);
405+
self.rebalance_impl.destroy();
406+
*self.service_state = ServiceState::ShutdownAlready;
407+
}
408+
ServiceState::ShutdownAlready => {
409+
warn!(
410+
"the consumer [{}] has been shutdown, do nothing",
411+
self.consumer_config.consumer_group
412+
);
413+
}
414+
ServiceState::StartFailed => {
415+
warn!(
416+
"the consumer [{}] start failed, do nothing",
417+
self.consumer_config.consumer_group
418+
);
419+
}
420+
}
421+
drop(_lock);
422+
}
423+
375424
async fn update_topic_subscribe_info_when_subscription_changed(&mut self) {
376425
if DO_NOT_UPDATE_TOPIC_SUBSCRIBE_INFO_WHEN_SUBSCRIPTION_CHANGED {
377426
return;
@@ -1239,8 +1288,27 @@ impl MQConsumerInner for DefaultMQPushConsumerImpl {
12391288
Ok(false)
12401289
}
12411290

1242-
fn persist_consumer_offset(&self) {
1243-
todo!()
1291+
async fn persist_consumer_offset(&self) {
1292+
if let Err(err) = self.make_sure_state_ok() {
1293+
error!(
1294+
"group: {} persistConsumerOffset exception:{}",
1295+
self.consumer_config.consumer_group, err
1296+
);
1297+
} else {
1298+
let guard = self
1299+
.rebalance_impl
1300+
.rebalance_impl_inner
1301+
.process_queue_table
1302+
.read()
1303+
.await;
1304+
let allocate_mq = guard.keys().cloned().collect::<HashSet<_>>();
1305+
self.offset_store
1306+
.as_ref()
1307+
.unwrap()
1308+
.mut_from_ref()
1309+
.persist_all(&allocate_mq)
1310+
.await;
1311+
}
12441312
}
12451313

12461314
async fn update_topic_subscribe_info(&mut self, topic: &str, info: &HashSet<MessageQueue>) {

ā€Žrocketmq-client/src/consumer/consumer_impl/pull_api_wrapper.rsā€Ž

Lines changed: 14 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@ use crate::factory::mq_client_instance::MQClientInstance;
4343
use crate::hook::filter_message_context::FilterMessageContext;
4444
use crate::hook::filter_message_hook::FilterMessageHook;
4545
use crate::implementation::communication_mode::CommunicationMode;
46+
use crate::implementation::mq_client_api_impl::MQClientAPIImpl;
4647
use crate::Result;
4748

4849
#[derive(Clone)]
@@ -107,6 +108,7 @@ impl PullAPIWrapper {
107108
self.client_instance.client_config.decode_read_body,
108109
self.client_instance.client_config.decode_decompress_body,
109110
);
111+
110112
let mut need_decode_inner_message = false;
111113
for msg in &msg_vec {
112114
if MessageSysFlag::check(
@@ -191,6 +193,7 @@ impl PullAPIWrapper {
191193
msg.message_ext_inner.queue_offset += offset_delta;
192194
}
193195
}
196+
194197
pull_result_ext.pull_result.msg_found_list = msg_list_filter_again
195198
.into_iter()
196199
.map(ArcRefCellWrapper::new)
@@ -233,7 +236,7 @@ impl PullAPIWrapper {
233236
pull_callback: PCB,
234237
) -> Result<Option<PullResultExt>>
235238
where
236-
PCB: PullCallback,
239+
PCB: PullCallback + 'static,
237240
{
238241
let broker_name = self
239242
.client_instance
@@ -320,16 +323,16 @@ impl PullAPIWrapper {
320323
.compute_pull_from_which_filter_server(mq.get_topic(), broker_addr.as_str())
321324
.await?;
322325
}
323-
self.client_instance
324-
.get_mq_client_api_impl()
325-
.pull_message(
326-
broker_addr.as_str(),
327-
request_header,
328-
timeout_millis,
329-
communication_mode,
330-
pull_callback,
331-
)
332-
.await
326+
327+
MQClientAPIImpl::pull_message(
328+
self.client_instance.get_mq_client_api_impl(),
329+
broker_addr,
330+
request_header,
331+
timeout_millis,
332+
communication_mode,
333+
pull_callback,
334+
)
335+
.await
333336
} else {
334337
Err(MQClientErr(
335338
-1,

ā€Žrocketmq-client/src/consumer/consumer_impl/re_balance.rsā€Ž

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -70,4 +70,5 @@ pub trait RebalanceLocal {
7070
async fn do_rebalance(&mut self, is_order: bool) -> bool;
7171

7272
fn client_rebalance(&mut self, topic: &str) -> bool;
73+
fn destroy(&mut self);
7374
}

0 commit comments

Comments
Ā (0)