Skip to content

Commit c54bd64

Browse files
authored
[ISSUE #964]🚀Support client consumer message-2🚀 (#965)
1 parent c6def60 commit c54bd64

20 files changed

Lines changed: 913 additions & 142 deletions

rocketmq-client/src/consumer/allocate_message_queue_strategy.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,14 +16,16 @@
1616
*/
1717
use rocketmq_common::common::message::message_queue::MessageQueue;
1818

19+
use crate::Result;
20+
1921
pub trait AllocateMessageQueueStrategy: Send + Sync {
2022
fn allocate(
2123
&self,
2224
consumer_group: &str,
2325
current_cid: &str,
2426
mq_all: &[MessageQueue],
2527
cid_all: &[String],
26-
) -> Vec<MessageQueue>;
28+
) -> Result<Vec<MessageQueue>>;
2729

2830
fn get_name(&self) -> &str;
2931
}

rocketmq-client/src/consumer/consumer_impl.rs

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -14,16 +14,26 @@
1414
* See the License for the specific language governing permissions and
1515
* limitations under the License.
1616
*/
17+
use once_cell::sync::Lazy;
18+
1719
pub(crate) mod consume_message_concurrently_service;
1820
pub(crate) mod consume_message_orderly_service;
1921
pub(crate) mod consume_message_pop_concurrently_service;
2022
pub(crate) mod consume_message_pop_orderly_service;
2123
pub(crate) mod consume_message_service;
2224
pub(crate) mod default_mq_push_consumer_impl;
25+
pub(crate) mod message_request;
2326
pub(crate) mod pop_process_queue;
27+
pub(crate) mod pop_request;
2428
pub(crate) mod process_queue;
2529
pub(crate) mod pull_api_wrapper;
2630
pub(crate) mod pull_message_service;
27-
pub(crate) mod rebalance_impl;
28-
pub(crate) mod rebalance_push_impl;
29-
pub(crate) mod rebalance_service;
31+
pub(crate) mod pull_request;
32+
pub(crate) mod re_balance;
33+
34+
pub(crate) static PULL_MAX_IDLE_TIME: Lazy<u64> = Lazy::new(|| {
35+
std::env::var("rocketmq.client.pull.pullMaxIdleTime")
36+
.unwrap_or_else(|_| "120000".into())
37+
.parse()
38+
.unwrap_or(120000)
39+
});

rocketmq-client/src/consumer/consumer_impl/default_mq_push_consumer_impl.rs

Lines changed: 30 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717

1818
use std::collections::HashSet;
1919
use std::sync::atomic::AtomicBool;
20+
use std::sync::atomic::Ordering;
2021
use std::sync::Arc;
2122

2223
use rocketmq_common::common::base::service_state::ServiceState;
@@ -44,7 +45,7 @@ use crate::consumer::consumer_impl::consume_message_service::ConsumeMessageConcu
4445
use crate::consumer::consumer_impl::consume_message_service::ConsumeMessageOrderlyServiceGeneral;
4546
use crate::consumer::consumer_impl::consume_message_service::ConsumeMessageServiceTrait;
4647
use crate::consumer::consumer_impl::pull_api_wrapper::PullAPIWrapper;
47-
use crate::consumer::consumer_impl::rebalance_push_impl::RebalancePushImpl;
48+
use crate::consumer::consumer_impl::re_balance::rebalance_push_impl::RebalancePushImpl;
4849
use crate::consumer::default_mq_push_consumer::ConsumerConfig;
4950
use crate::consumer::listener::message_listener::MessageListener;
5051
use crate::consumer::mq_consumer_inner::MQConsumerInner;
@@ -76,7 +77,7 @@ pub struct DefaultMQPushConsumerImpl {
7677
filter_message_hook_list: Vec<Arc<Box<dyn FilterMessageHook + Send + Sync>>>,
7778
rpc_hook: Option<Arc<Box<dyn RPCHook>>>,
7879
service_state: ServiceState,
79-
mq_client_factory: Option<ArcRefCellWrapper<MQClientInstance>>,
80+
mq_client_factory: Option<ArcRefCellWrapper<MQClientInstance<DefaultMQPushConsumerImpl>>>,
8081
pull_api_wrapper: Option<ArcRefCellWrapper<PullAPIWrapper>>,
8182
pause: Arc<AtomicBool>,
8283
consume_orderly: bool,
@@ -109,10 +110,13 @@ impl DefaultMQPushConsumerImpl {
109110
consumer_config: ConsumerConfig,
110111
rpc_hook: Option<Arc<Box<dyn RPCHook>>>,
111112
) -> Self {
112-
Self {
113-
client_config,
114-
consumer_config,
115-
rebalance_impl: ArcRefCellWrapper::new(RebalancePushImpl),
113+
let mut this = Self {
114+
client_config: client_config.clone(),
115+
consumer_config: consumer_config.clone(),
116+
rebalance_impl: ArcRefCellWrapper::new(RebalancePushImpl::new(
117+
client_config,
118+
consumer_config,
119+
)),
116120
filter_message_hook_list: vec![],
117121
rpc_hook,
118122
service_state: ServiceState::CreateJust,
@@ -129,7 +133,10 @@ impl DefaultMQPushConsumerImpl {
129133
pop_delay_level: Arc::new([
130134
10, 30, 60, 120, 180, 240, 300, 360, 420, 480, 540, 600, 1200, 1800, 3600, 7200,
131135
]),
132-
}
136+
};
137+
let wrapper = ArcRefCellWrapper::downgrade(&this.rebalance_impl);
138+
this.rebalance_impl.set_rebalance_impl(wrapper);
139+
this
133140
}
134141
}
135142

@@ -138,7 +145,7 @@ impl DefaultMQPushConsumerImpl {
138145
match self.service_state {
139146
ServiceState::CreateJust => {
140147
info!(
141-
"the consumer [{}] start beginning. messageModel={}, isUnitMode={}",
148+
"the consumer [{}] start beginning. message_model={}, isUnitMode={}",
142149
self.consumer_config.consumer_group,
143150
self.consumer_config.message_model,
144151
self.consumer_config.unit_mode
@@ -160,7 +167,12 @@ impl DefaultMQPushConsumerImpl {
160167
self.rebalance_impl
161168
.set_message_model(self.consumer_config.message_model);
162169
self.rebalance_impl.set_allocate_message_queue_strategy(
163-
self.consumer_config.allocate_message_queue_strategy.clone(),
170+
self.consumer_config
171+
.allocate_message_queue_strategy
172+
.clone()
173+
.expect(
174+
"allocate_message_queue_strategy is null, please set it before start",
175+
),
164176
);
165177
self.rebalance_impl
166178
.set_mq_client_factory(client_instance.clone());
@@ -249,7 +261,7 @@ impl DefaultMQPushConsumerImpl {
249261

250262
self.mq_client_factory.as_mut().unwrap().start().await?;
251263
info!(
252-
"the consumer [{}] start OK, messageModel={}, isUnitMode={}",
264+
"the consumer [{}] start OK, message_model={}, isUnitMode={}",
253265
self.consumer_config.consumer_group,
254266
self.consumer_config.message_model,
255267
self.consumer_config.unit_mode
@@ -301,7 +313,7 @@ impl DefaultMQPushConsumerImpl {
301313
return Err(MQClientError::MQClientException(
302314
-1,
303315
format!(
304-
"consumerGroup is empty, {}",
316+
"consumer_group is empty, {}",
305317
FAQUrl::suggest_todo(FAQUrl::CLIENT_PARAMETER_CHECK_URL)
306318
),
307319
));
@@ -311,7 +323,7 @@ impl DefaultMQPushConsumerImpl {
311323
return Err(MQClientError::MQClientException(
312324
-1,
313325
format!(
314-
"consumerGroup can not equal {} please specify another one.{}",
326+
"consumer_group can not equal {} please specify another one.{}",
315327
DEFAULT_CONSUMER_GROUP,
316328
FAQUrl::suggest_todo(FAQUrl::CLIENT_PARAMETER_CHECK_URL)
317329
),
@@ -340,7 +352,7 @@ impl DefaultMQPushConsumerImpl {
340352
return Err(MQClientError::MQClientException(
341353
-1,
342354
format!(
343-
"allocateMessageQueueStrategy is null{}",
355+
"allocate_message_queue_strategy is null{}",
344356
FAQUrl::suggest_todo(FAQUrl::CLIENT_PARAMETER_CHECK_URL)
345357
),
346358
));
@@ -611,8 +623,11 @@ impl MQConsumerInner for DefaultMQPushConsumerImpl {
611623
todo!()
612624
}
613625

614-
fn try_rebalance(&self) -> bool {
615-
todo!()
626+
async fn try_rebalance(&self) -> Result<bool> {
627+
if !self.pause.load(Ordering::Acquire) {
628+
//self.rebalance_impl.do
629+
}
630+
unimplemented!()
616631
}
617632

618633
fn persist_consumer_offset(&self) {

rocketmq-client/src/consumer/consumer_impl/rebalance_impl.rs renamed to rocketmq-client/src/consumer/consumer_impl/message_request.rs

Lines changed: 3 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -14,17 +14,8 @@
1414
* See the License for the specific language governing permissions and
1515
* limitations under the License.
1616
*/
17-
use rocketmq_remoting::protocol::heartbeat::subscription_data::SubscriptionData;
17+
use rocketmq_common::common::message::message_enum::MessageRequestMode;
1818

19-
pub struct RebalanceImpl;
20-
21-
impl RebalanceImpl {
22-
pub fn new() -> Self {
23-
RebalanceImpl {}
24-
}
25-
26-
pub async fn put_subscription_data(&self, topic: &str, subscription_data: SubscriptionData) {
27-
// TODO
28-
unimplemented!("put_subscription_data")
29-
}
19+
pub trait MessageRequest {
20+
fn get_message_request_mode(&self) -> MessageRequestMode;
3021
}

rocketmq-client/src/consumer/consumer_impl/pop_process_queue.rs

Lines changed: 5 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -16,29 +16,25 @@
1616
*/
1717
use std::sync::atomic::AtomicUsize;
1818
use std::sync::atomic::Ordering;
19+
use std::sync::Arc;
1920

20-
use once_cell::sync::Lazy;
2121
use rocketmq_common::TimeUtils::get_current_millis;
2222
use rocketmq_remoting::protocol::body::pop_process_queue_info::PopProcessQueueInfo;
2323

24-
static PULL_MAX_IDLE_TIME: Lazy<u64> = Lazy::new(|| {
25-
std::env::var("rocketmq.client.pull.pullMaxIdleTime")
26-
.unwrap_or_else(|_| "120000".into())
27-
.parse()
28-
.unwrap_or(120000)
29-
});
24+
use crate::consumer::consumer_impl::PULL_MAX_IDLE_TIME;
3025

26+
#[derive(Clone)]
3127
pub(crate) struct PopProcessQueue {
3228
last_pop_timestamp: u64,
33-
wait_ack_counter: AtomicUsize,
29+
wait_ack_counter: Arc<AtomicUsize>,
3430
dropped: bool,
3531
}
3632

3733
impl PopProcessQueue {
3834
pub(crate) fn new() -> Self {
3935
PopProcessQueue {
4036
last_pop_timestamp: get_current_millis(),
41-
wait_ack_counter: AtomicUsize::new(0),
37+
wait_ack_counter: Arc::new(AtomicUsize::new(0)),
4238
dropped: false,
4339
}
4440
}
Lines changed: 135 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,135 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
use std::hash::Hash;
18+
use std::hash::Hasher;
19+
20+
use rocketmq_common::common::message::message_enum::MessageRequestMode;
21+
use rocketmq_common::common::message::message_queue::MessageQueue;
22+
23+
use crate::consumer::consumer_impl::message_request::MessageRequest;
24+
use crate::consumer::consumer_impl::pop_process_queue::PopProcessQueue;
25+
26+
#[derive(Clone)]
27+
pub struct PopRequest {
28+
topic: String,
29+
consumer_group: String,
30+
message_queue: MessageQueue,
31+
pop_process_queue: PopProcessQueue,
32+
locked_first: bool,
33+
init_mode: i32,
34+
}
35+
36+
impl PopRequest {
37+
pub fn new(
38+
topic: String,
39+
consumer_group: String,
40+
message_queue: MessageQueue,
41+
pop_process_queue: PopProcessQueue,
42+
init_mode: i32,
43+
) -> Self {
44+
PopRequest {
45+
topic,
46+
consumer_group,
47+
message_queue,
48+
pop_process_queue,
49+
locked_first: false,
50+
init_mode,
51+
}
52+
}
53+
54+
pub fn is_locked_first(&self) -> bool {
55+
self.locked_first
56+
}
57+
58+
pub fn set_locked_first(&mut self, locked_first: bool) {
59+
self.locked_first = locked_first;
60+
}
61+
62+
pub fn get_consumer_group(&self) -> &str {
63+
&self.consumer_group
64+
}
65+
66+
pub fn set_consumer_group(&mut self, consumer_group: String) {
67+
self.consumer_group = consumer_group;
68+
}
69+
70+
pub fn get_message_queue(&self) -> &MessageQueue {
71+
&self.message_queue
72+
}
73+
74+
pub fn set_message_queue(&mut self, message_queue: MessageQueue) {
75+
self.message_queue = message_queue;
76+
}
77+
78+
pub fn get_topic(&self) -> &str {
79+
&self.topic
80+
}
81+
82+
pub fn set_topic(&mut self, topic: String) {
83+
self.topic = topic;
84+
}
85+
86+
pub fn get_pop_process_queue(&self) -> &PopProcessQueue {
87+
&self.pop_process_queue
88+
}
89+
90+
pub fn set_pop_process_queue(&mut self, pop_process_queue: PopProcessQueue) {
91+
self.pop_process_queue = pop_process_queue;
92+
}
93+
94+
pub fn get_init_mode(&self) -> i32 {
95+
self.init_mode
96+
}
97+
98+
pub fn set_init_mode(&mut self, init_mode: i32) {
99+
self.init_mode = init_mode;
100+
}
101+
}
102+
103+
impl MessageRequest for PopRequest {
104+
fn get_message_request_mode(&self) -> MessageRequestMode {
105+
MessageRequestMode::Pop
106+
}
107+
}
108+
109+
impl Hash for PopRequest {
110+
fn hash<H: Hasher>(&self, state: &mut H) {
111+
self.topic.hash(state);
112+
self.consumer_group.hash(state);
113+
self.message_queue.hash(state);
114+
}
115+
}
116+
117+
impl PartialEq for PopRequest {
118+
fn eq(&self, other: &Self) -> bool {
119+
self.topic == other.topic
120+
&& self.consumer_group == other.consumer_group
121+
&& self.message_queue == other.message_queue
122+
}
123+
}
124+
125+
impl Eq for PopRequest {}
126+
127+
impl std::fmt::Display for PopRequest {
128+
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
129+
write!(
130+
f,
131+
"PopRequest [topic={}, consumer_group={}, message_queue={:?}]",
132+
self.topic, self.consumer_group, self.message_queue
133+
)
134+
}
135+
}

0 commit comments

Comments
 (0)