Skip to content

Commit 870b3d2

Browse files
committed
[ISSUE #1301]🔥Rocketmq-rust broker supports request code QUERY_ASSIGNMENT(400)🚀
1 parent 9956514 commit 870b3d2

4 files changed

Lines changed: 297 additions & 7 deletions

File tree

rocketmq-broker/src/broker_runtime.rs

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -81,6 +81,7 @@ use crate::schedule::schedule_message_service::ScheduleMessageService;
8181
use crate::subscription::manager::subscription_group_manager::SubscriptionGroupManager;
8282
use crate::topic::manager::topic_config_manager::TopicConfigManager;
8383
use crate::topic::manager::topic_queue_mapping_manager::TopicQueueMappingManager;
84+
use crate::topic::manager::topic_route_info_manager::TopicRouteInfoManager;
8485
use crate::topic::topic_queue_mapping_clean_service::TopicQueueMappingCleanService;
8586
use crate::transaction::queue::default_transactional_message_check_listener::DefaultTransactionalMessageCheckListener;
8687
use crate::transaction::queue::default_transactional_message_service::DefaultTransactionalMessageService;
@@ -133,6 +134,7 @@ pub(crate) struct BrokerRuntime {
133134
Option<Arc<DefaultTransactionalMessageCheckListener<DefaultMessageStore>>>,
134135
transactional_message_check_service: Option<Arc<TransactionalMessageCheckService>>,
135136
transaction_metrics_flush_service: Option<Arc<TransactionMetricsFlushService>>,
137+
topic_route_info_manager: Arc<TopicRouteInfoManager>,
136138
}
137139

138140
impl Clone for BrokerRuntime {
@@ -171,6 +173,7 @@ impl Clone for BrokerRuntime {
171173
transactional_message_check_listener: self.transactional_message_check_listener.clone(),
172174
transactional_message_check_service: None,
173175
transaction_metrics_flush_service: None,
176+
topic_route_info_manager: self.topic_route_info_manager.clone(),
174177
}
175178
}
176179
}
@@ -239,7 +242,7 @@ impl BrokerRuntime {
239242
broker_stats: None,
240243
schedule_message_service: Default::default(),
241244
timer_message_store: None,
242-
broker_out_api: broker_outer_api,
245+
broker_out_api: broker_outer_api.clone(),
243246
broker_runtime: Some(runtime),
244247
producer_manager,
245248
consumer_manager,
@@ -259,6 +262,10 @@ impl BrokerRuntime {
259262
transactional_message_check_listener: None,
260263
transactional_message_check_service: None,
261264
transaction_metrics_flush_service: None,
265+
topic_route_info_manager: Arc::new(TopicRouteInfoManager::new(
266+
broker_outer_api,
267+
broker_config,
268+
)),
262269
}
263270
}
264271

@@ -540,6 +547,9 @@ impl BrokerRuntime {
540547
consumer_manage_processor: ArcMut::new(consumer_manage_processor),
541548
query_assignment_processor: ArcMut::new(QueryAssignmentProcessor::new(
542549
self.message_store_config.clone(),
550+
self.broker_config.clone(),
551+
self.topic_route_info_manager.clone(),
552+
self.consumer_manager.clone(),
543553
)),
544554
query_message_processor: ArcMut::new(query_message_processor),
545555
end_transaction_processor: ArcMut::new(EndTransactionProcessor::new(
@@ -736,6 +746,8 @@ impl BrokerRuntime {
736746
let this = pull_request_hold_service.clone();
737747
pull_request_hold_service.start(this);
738748
}
749+
750+
self.topic_route_info_manager.start();
739751
}
740752

741753
async fn update_namesrv_addr(&mut self) {

rocketmq-broker/src/error.rs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,4 +27,10 @@ pub enum BrokerError {
2727

2828
#[error("Client exception occurred: CODE:{0}, broker address:{2}, Message:{1}")]
2929
MQBrokerError(i32, String, String),
30+
31+
#[error("{0}")]
32+
IllegalArgumentError(String),
33+
34+
#[error("Client error: {0}")]
35+
ClientError(#[from] rocketmq_client_rust::error::MQClientError),
3036
}

rocketmq-broker/src/processor/query_assignment_processor.rs

Lines changed: 276 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -25,21 +25,43 @@ use crate::load_balance::message_request_mode_manager::MessageRequestModeManager
2525
use rocketmq_remoting::protocol::remoting_command::RemotingCommand;
2626
use rocketmq_remoting::runtime::connection_handler_context::ConnectionHandlerContext;
2727
use rocketmq_store::config::message_store_config::MessageStoreConfig;
28-
use std::collections::HashMap;
28+
use std::collections::{HashMap, HashSet};
2929
use std::sync::Arc;
30+
use tracing::warn;
31+
use rocketmq_common::common::broker::broker_config::BrokerConfig;
32+
use rocketmq_common::common::message::message_enum::MessageRequestMode;
33+
use rocketmq_common::common::message::message_queue::MessageQueue;
34+
use rocketmq_common::common::message::message_queue_assignment::MessageQueueAssignment;
35+
use rocketmq_common::common::mix_all;
3036
use rocketmq_common::common::mix_all::RETRY_GROUP_TOPIC_PREFIX;
3137
use rocketmq_remoting::code::response_code::ResponseCode;
38+
use rocketmq_remoting::protocol::body::query_assignment_request_body::QueryAssignmentRequestBody;
39+
use rocketmq_remoting::protocol::body::query_assignment_response_body::QueryAssignmentResponseBody;
3240
use rocketmq_remoting::protocol::body::set_message_request_mode_request_body::SetMessageRequestModeRequestBody;
33-
use rocketmq_remoting::protocol::RemotingDeserializable;
41+
use rocketmq_remoting::protocol::heartbeat::message_model::MessageModel;
42+
use rocketmq_remoting::protocol::{RemotingDeserializable, RemotingSerializable};
43+
use crate::client::manager::consumer_manager::ConsumerManager;
44+
use crate::error::BrokerError;
45+
use crate::error::BrokerError::IllegalArgumentError;
46+
use crate::topic::manager::topic_route_info_manager::TopicRouteInfoManager;
47+
use crate::Result;
3448

3549
pub struct QueryAssignmentProcessor {
3650
message_request_mode_manager: MessageRequestModeManager,
3751
load_strategy: HashMap<CheetahString, Arc<dyn AllocateMessageQueueStrategy>>,
3852
message_store_config: Arc<MessageStoreConfig>,
53+
broker_config: Arc<BrokerConfig>,
54+
topic_route_info_manager: Arc<TopicRouteInfoManager>,
55+
consumer_manager: Arc<ConsumerManager>,
3956
}
4057

4158
impl QueryAssignmentProcessor {
42-
pub fn new(message_store_config: Arc<MessageStoreConfig>) -> Self {
59+
pub fn new(
60+
message_store_config: Arc<MessageStoreConfig>,
61+
broker_config: Arc<BrokerConfig>,
62+
topic_route_info_manager: Arc<TopicRouteInfoManager>,
63+
consumer_manager: Arc<ConsumerManager>,
64+
) -> Self {
4365
let allocate_message_queue_averagely: Arc<dyn AllocateMessageQueueStrategy> =
4466
Arc::new(AllocateMessageQueueAveragely);
4567
let allocate_message_queue_averagely_by_circle: Arc<dyn AllocateMessageQueueStrategy> =
@@ -59,6 +81,9 @@ impl QueryAssignmentProcessor {
5981
message_request_mode_manager: manager,
6082
load_strategy,
6183
message_store_config,
84+
broker_config,
85+
topic_route_info_manager,
86+
consumer_manager,
6287
}
6388
}
6489
}
@@ -82,11 +107,220 @@ impl QueryAssignmentProcessor {
82107

83108
async fn query_assignment(
84109
&mut self,
85-
_channel: Channel,
110+
channel: Channel,
86111
_ctx: ConnectionHandlerContext,
87-
_request: RemotingCommand,
112+
request: RemotingCommand,
88113
) -> Option<RemotingCommand> {
89-
unimplemented!()
114+
let request_body =
115+
QueryAssignmentRequestBody::decode(request.get_body().expect("empty body"))
116+
.expect("decode QueryAssignmentRequestBody failed");
117+
let set_message_request_mode_request_body = self
118+
.message_request_mode_manager
119+
.get_message_request_mode(&request_body.topic, &request_body.consumer_group);
120+
let set_message_request_mode_request_body =
121+
if let Some(set_message_request_mode_request_body) =
122+
set_message_request_mode_request_body
123+
{
124+
set_message_request_mode_request_body
125+
} else {
126+
let mut body = SetMessageRequestModeRequestBody {
127+
topic: request_body.topic.clone(),
128+
consumer_group: request_body.consumer_group.clone(),
129+
..Default::default()
130+
};
131+
if request_body.topic.starts_with(RETRY_GROUP_TOPIC_PREFIX) {
132+
body.mode = MessageRequestMode::Pull;
133+
} else {
134+
body.mode = self.broker_config.default_message_request_mode;
135+
}
136+
if body.mode == MessageRequestMode::Pop {
137+
body.pop_share_queue_num = self.broker_config.default_pop_share_queue_num;
138+
}
139+
body
140+
};
141+
let mode = set_message_request_mode_request_body.mode;
142+
let message_queues = self
143+
.do_load_balance(
144+
&request_body.topic,
145+
&request_body.consumer_group,
146+
&request_body.client_id,
147+
request_body.message_model,
148+
&request_body.strategy_name,
149+
set_message_request_mode_request_body,
150+
channel,
151+
)
152+
.await;
153+
let assignments = if let Some(message_queues) = message_queues {
154+
message_queues
155+
.into_iter()
156+
.map(|mq| MessageQueueAssignment {
157+
message_queue: Some(mq),
158+
mode,
159+
attachments: None,
160+
})
161+
.collect()
162+
} else {
163+
HashSet::new()
164+
};
165+
let body = QueryAssignmentResponseBody {
166+
message_queue_assignments: assignments,
167+
};
168+
Some(RemotingCommand::create_response_command().set_body(body.encode()))
169+
}
170+
171+
async fn do_load_balance(
172+
&mut self,
173+
topic: &CheetahString,
174+
consumer_group: &CheetahString,
175+
client_id: &CheetahString,
176+
message_model: MessageModel,
177+
strategy_name: &CheetahString,
178+
set_message_request_mode_request_body: SetMessageRequestModeRequestBody,
179+
channel: Channel,
180+
) -> Option<HashSet<MessageQueue>> {
181+
match message_model {
182+
MessageModel::Broadcasting => {
183+
let assigned_queue_set = self
184+
.topic_route_info_manager
185+
.get_topic_subscribe_info(topic)
186+
.await;
187+
if assigned_queue_set.is_none() {
188+
warn!(
189+
"QueryLoad: no assignment for group[{}], the topic[{}] does not exist.",
190+
consumer_group, topic
191+
);
192+
}
193+
assigned_queue_set
194+
}
195+
MessageModel::Clustering => {
196+
let mq_set = if mix_all::is_lmq(Some(topic.as_str())) {
197+
let mut set = HashSet::new();
198+
let queue = MessageQueue::from_parts(
199+
topic.clone(),
200+
self.broker_config.broker_name.clone(),
201+
mix_all::LMQ_QUEUE_ID as i32,
202+
);
203+
set.insert(queue);
204+
Some(set)
205+
} else {
206+
self.topic_route_info_manager
207+
.get_topic_subscribe_info(topic)
208+
.await
209+
};
210+
211+
if mq_set.is_none() || mq_set.as_ref().unwrap().is_empty() {
212+
if topic.starts_with(RETRY_GROUP_TOPIC_PREFIX) {
213+
warn!(
214+
"QueryLoad: no assignment for group[{}], the topic[{}] does not exist.",
215+
consumer_group, topic
216+
);
217+
}
218+
return None;
219+
}
220+
221+
if !self.broker_config.server_load_balancer_enable {
222+
return mq_set;
223+
}
224+
let consumer_group_info = self
225+
.consumer_manager
226+
.get_consumer_group_info(consumer_group);
227+
let mut cid_all = if let Some(consumer_group_info) = consumer_group_info {
228+
consumer_group_info.get_all_client_ids()
229+
} else {
230+
vec![]
231+
};
232+
if cid_all.is_empty() {
233+
warn!(
234+
"QueryLoad: no assignment for group[{}] topic[{}], get consumer id list \
235+
failed",
236+
consumer_group, topic
237+
);
238+
return None;
239+
}
240+
let mut mq_all = mq_set.unwrap().into_iter().collect::<Vec<MessageQueue>>();
241+
mq_all.sort();
242+
cid_all.sort();
243+
244+
let strategy = self.load_strategy.get(strategy_name);
245+
if strategy.is_none() {
246+
warn!(
247+
"QueryLoad: unsupported strategy [{}], {}",
248+
strategy_name,
249+
channel.remote_address()
250+
);
251+
return None;
252+
}
253+
let strategy = strategy.unwrap();
254+
let result =
255+
if set_message_request_mode_request_body.mode == MessageRequestMode::Pop {
256+
self.allocate_for_pop(
257+
strategy,
258+
consumer_group,
259+
client_id,
260+
mq_all.as_slice(),
261+
cid_all.as_slice(),
262+
set_message_request_mode_request_body.pop_share_queue_num,
263+
)
264+
} else {
265+
match strategy.allocate(
266+
consumer_group,
267+
client_id,
268+
mq_all.as_slice(),
269+
cid_all.as_slice(),
270+
) {
271+
Ok(value) => Ok(value.into_iter().collect::<HashSet<MessageQueue>>()),
272+
Err(e) => Err(BrokerError::ClientError(e)),
273+
}
274+
};
275+
match result {
276+
Ok(value) => Some(value),
277+
Err(_) => None,
278+
}
279+
}
280+
}
281+
}
282+
283+
pub fn allocate_for_pop(
284+
&self,
285+
strategy: &Arc<dyn AllocateMessageQueueStrategy>,
286+
consumer_group: &CheetahString,
287+
current_cid: &CheetahString,
288+
mq_all: &[MessageQueue],
289+
cid_all: &[CheetahString],
290+
pop_share_queue_num: i32,
291+
) -> Result<HashSet<MessageQueue>> {
292+
if pop_share_queue_num <= 0 || pop_share_queue_num > cid_all.len() as i32 - 1 {
293+
Ok(mq_all
294+
.iter()
295+
.map(|mq| {
296+
MessageQueue::from_parts(
297+
mq.get_topic_cs().clone(),
298+
mq.get_broker_name().clone(),
299+
-1,
300+
)
301+
})
302+
.collect::<HashSet<MessageQueue>>())
303+
} else if cid_all.len() <= mq_all.len() {
304+
let mut allocate_result = strategy
305+
.allocate(consumer_group, current_cid, mq_all, cid_all)
306+
.unwrap();
307+
let index = cid_all.iter().position(|cid| cid == current_cid);
308+
if let Some(mut index) = index {
309+
for _i in 1..pop_share_queue_num {
310+
index += index;
311+
index %= cid_all.len();
312+
let result = strategy
313+
.allocate(consumer_group, &cid_all[index], mq_all, cid_all)
314+
.unwrap();
315+
allocate_result.extend(result);
316+
}
317+
}
318+
Ok(allocate_result
319+
.into_iter()
320+
.collect::<HashSet<MessageQueue>>())
321+
} else {
322+
allocate(consumer_group, current_cid, mq_all, cid_all)
323+
}
90324
}
91325

92326
async fn set_message_request_mode(
@@ -117,3 +351,39 @@ impl QueryAssignmentProcessor {
117351
))
118352
}
119353
}
354+
355+
fn allocate(
356+
consumer_group: &CheetahString,
357+
current_cid: &CheetahString,
358+
mq_all: &[MessageQueue],
359+
cid_all: &[CheetahString],
360+
) -> Result<HashSet<MessageQueue>> {
361+
if current_cid.is_empty() {
362+
return Err(IllegalArgumentError("currentCID is empty".to_string()));
363+
}
364+
if mq_all.is_empty() {
365+
return Err(IllegalArgumentError(
366+
"mqAll is null or mqAll empty".to_string(),
367+
));
368+
}
369+
if cid_all.is_empty() {
370+
return Err(IllegalArgumentError(
371+
"cidAll is null or cidAll empty".to_string(),
372+
));
373+
}
374+
375+
let mut result = HashSet::new();
376+
if !cid_all.contains(current_cid) {
377+
log::info!(
378+
"[BUG] ConsumerGroup: {} The consumerId: {} not in cidAll: {:?}",
379+
consumer_group,
380+
current_cid,
381+
cid_all
382+
);
383+
return Ok(result);
384+
}
385+
386+
let index = cid_all.iter().position(|cid| cid == current_cid).unwrap();
387+
result.insert(mq_all[index % mq_all.len()].clone());
388+
Ok(result)
389+
}

0 commit comments

Comments
 (0)