Skip to content

Commit 578f7e3

Browse files
committed
[ISSUE #1293]🔥Rocketmq-client supports the AllocateMessageQueueStrategy algorithm-AllocateMessageQueueAveragelyByCircle🚀
1 parent 79a345a commit 578f7e3

4 files changed

Lines changed: 196 additions & 4 deletions

File tree

rocketmq-client/src/consumer/allocate_message_queue_strategy.rs

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,21 @@ use rocketmq_common::common::message::message_queue::MessageQueue;
1919

2020
use crate::Result;
2121

22+
/// Trait for allocating message queues to consumers in a consumer group.
23+
/// This trait is implemented by different strategies for message queue allocation.
2224
pub trait AllocateMessageQueueStrategy: Send + Sync {
25+
/// Allocates message queues to a consumer in a consumer group.
26+
///
27+
/// # Arguments
28+
///
29+
/// * `consumer_group` - The name of the consumer group.
30+
/// * `current_cid` - The ID of the current consumer.
31+
/// * `mq_all` - A slice of all available message queues.
32+
/// * `cid_all` - A slice of all consumer IDs in the consumer group.
33+
///
34+
/// # Returns
35+
///
36+
/// A `Result` containing a vector of allocated message queues or an error.
2337
fn allocate(
2438
&self,
2539
consumer_group: &CheetahString,
@@ -28,5 +42,10 @@ pub trait AllocateMessageQueueStrategy: Send + Sync {
2842
cid_all: &[CheetahString],
2943
) -> Result<Vec<MessageQueue>>;
3044

45+
/// Returns the name of the allocation strategy.
46+
///
47+
/// # Returns
48+
///
49+
/// A static string slice representing the name of the strategy.
3150
fn get_name(&self) -> &'static str;
3251
}

rocketmq-client/src/consumer/rebalance_strategy.rs

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
* limitations under the License.
1616
*/
1717
pub mod allocate_message_queue_averagely;
18+
mod allocate_message_queue_averagely_by_circle;
1819

1920
use std::collections::HashSet;
2021

@@ -26,8 +27,8 @@ use crate::error::MQClientError::IllegalArgumentError;
2627
use crate::Result;
2728

2829
pub fn check(
29-
consumer_group: &str,
30-
current_cid: &str,
30+
consumer_group: &CheetahString,
31+
current_cid: &CheetahString,
3132
mq_all: &[MessageQueue],
3233
cid_all: &[CheetahString],
3334
) -> Result<bool> {
@@ -44,9 +45,9 @@ pub fn check(
4445
"cidAll is null or cidAll empty".to_string(),
4546
));
4647
}
47-
let current_cid: CheetahString = current_cid.to_string().into();
48+
4849
let cid_set: HashSet<_> = cid_all.iter().collect();
49-
if !cid_set.contains(&current_cid) {
50+
if !cid_set.contains(current_cid) {
5051
info!(
5152
"[BUG] ConsumerGroup: {} The consumerId: {} not in cidAll: {:?}",
5253
consumer_group, current_cid, cid_all
Lines changed: 169 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,169 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
use cheetah_string::CheetahString;
18+
use rocketmq_common::common::message::message_queue::MessageQueue;
19+
20+
use crate::consumer::allocate_message_queue_strategy::AllocateMessageQueueStrategy;
21+
use crate::consumer::rebalance_strategy::check;
22+
23+
pub struct AllocateMessageQueueAveragelyByCircle;
24+
25+
impl AllocateMessageQueueStrategy for AllocateMessageQueueAveragelyByCircle {
26+
fn allocate(
27+
&self,
28+
consumer_group: &CheetahString,
29+
current_cid: &CheetahString,
30+
mq_all: &[MessageQueue],
31+
cid_all: &[CheetahString],
32+
) -> crate::Result<Vec<MessageQueue>> {
33+
let mut result = Vec::new();
34+
if !check(consumer_group, current_cid, mq_all, cid_all)? {
35+
return Ok(result);
36+
}
37+
let index = cid_all
38+
.iter()
39+
.position(|cid| cid == current_cid)
40+
.unwrap_or(0);
41+
for (i, item) in mq_all.iter().enumerate().skip(index) {
42+
if i % cid_all.len() == index {
43+
result.push(item.clone());
44+
}
45+
}
46+
Ok(result)
47+
}
48+
49+
#[inline]
50+
fn get_name(&self) -> &'static str {
51+
"AVG_BY_CIRCLE"
52+
}
53+
}
54+
55+
#[cfg(test)]
56+
mod tests {
57+
use std::collections::HashMap;
58+
59+
use cheetah_string::CheetahString;
60+
use rocketmq_common::common::message::message_queue::MessageQueue;
61+
62+
use super::*;
63+
64+
#[test]
65+
fn allocate_returns_empty_when_check_fails() {
66+
let strategy = AllocateMessageQueueAveragelyByCircle;
67+
let consumer_group = CheetahString::from("test_group");
68+
let current_cid = CheetahString::from("consumer1");
69+
let mq_all = vec![MessageQueue::from_parts("topic", "broker", 0)];
70+
let cid_all = vec![CheetahString::from("consumer1")];
71+
72+
let result = strategy
73+
.allocate(&consumer_group, &current_cid, &mq_all, &cid_all)
74+
.unwrap();
75+
assert!(!result.is_empty());
76+
}
77+
78+
#[test]
79+
fn allocate_returns_correct_queues_for_single_consumer() {
80+
let strategy = AllocateMessageQueueAveragelyByCircle;
81+
let consumer_group = CheetahString::from("test_group");
82+
let current_cid = CheetahString::from("consumer1");
83+
let mq_all = vec![
84+
MessageQueue::from_parts("topic", "broker", 0),
85+
MessageQueue::from_parts("topic", "broker", 1),
86+
];
87+
let cid_all = vec![CheetahString::from("consumer1")];
88+
89+
let result = strategy
90+
.allocate(&consumer_group, &current_cid, &mq_all, &cid_all)
91+
.unwrap();
92+
assert_eq!(result.len(), 2);
93+
assert_eq!(result[0].get_queue_id(), 0);
94+
assert_eq!(result[1].get_queue_id(), 1);
95+
}
96+
97+
#[test]
98+
fn allocate_returns_correct_queues_for_multiple_consumers() {
99+
let strategy = AllocateMessageQueueAveragelyByCircle;
100+
let consumer_group = CheetahString::from("test_group");
101+
let current_cid = CheetahString::from("consumer2");
102+
let mq_all = vec![
103+
MessageQueue::from_parts("topic", "broker", 0),
104+
MessageQueue::from_parts("topic", "broker", 1),
105+
MessageQueue::from_parts("topic", "broker", 2),
106+
];
107+
let cid_all = vec![
108+
CheetahString::from("consumer1"),
109+
CheetahString::from("consumer2"),
110+
CheetahString::from("consumer3"),
111+
];
112+
113+
let result = strategy
114+
.allocate(&consumer_group, &current_cid, &mq_all, &cid_all)
115+
.unwrap();
116+
assert_eq!(result.len(), 1);
117+
assert_eq!(result[0].get_queue_id(), 1);
118+
}
119+
120+
#[test]
121+
fn get_name_returns_correct_name() {
122+
let strategy = AllocateMessageQueueAveragelyByCircle;
123+
assert_eq!(strategy.get_name(), "AVG_BY_CIRCLE");
124+
}
125+
126+
#[test]
127+
fn test_allocate_message_queue_averagely_by_circle() {
128+
let consumer_id_list = create_consumer_id_list(4);
129+
let message_queue_list = create_message_queue_list(10);
130+
let allocate_queues = AllocateMessageQueueAveragelyByCircle
131+
.allocate(
132+
&CheetahString::from(""),
133+
&CheetahString::from("CID_PREFIX"),
134+
&message_queue_list,
135+
&consumer_id_list,
136+
)
137+
.unwrap();
138+
assert_eq!(0, allocate_queues.len());
139+
140+
let mut consumer_allocate_queue = HashMap::new();
141+
for consumer_id in &consumer_id_list {
142+
let queues = AllocateMessageQueueAveragelyByCircle
143+
.allocate(
144+
&CheetahString::from(""),
145+
consumer_id,
146+
&message_queue_list,
147+
&consumer_id_list,
148+
)
149+
.unwrap();
150+
let queue_ids: Vec<i32> = queues.iter().map(|q| q.get_queue_id()).collect();
151+
consumer_allocate_queue.insert(consumer_id.clone(), queue_ids);
152+
}
153+
assert_eq!(vec![0, 4, 8], consumer_allocate_queue["CID_PREFIX0"]);
154+
assert_eq!(vec![1, 5, 9], consumer_allocate_queue["CID_PREFIX1"]);
155+
assert_eq!(vec![2, 6], consumer_allocate_queue["CID_PREFIX2"]);
156+
assert_eq!(vec![3, 7], consumer_allocate_queue["CID_PREFIX3"]);
157+
}
158+
fn create_consumer_id_list(size: usize) -> Vec<CheetahString> {
159+
(0..size)
160+
.map(|i| format!("CID_PREFIX{}", i).into())
161+
.collect()
162+
}
163+
164+
fn create_message_queue_list(size: usize) -> Vec<MessageQueue> {
165+
(0..size)
166+
.map(|i| MessageQueue::from_parts("topic", "broker", i as i32))
167+
.collect()
168+
}
169+
}

rocketmq-common/src/common/message/message_queue.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,7 @@ impl MessageQueue {
6161
}
6262
}
6363

64+
#[inline]
6465
pub fn get_topic(&self) -> &str {
6566
&self.topic
6667
}
@@ -80,6 +81,7 @@ impl MessageQueue {
8081
&self.broker_name
8182
}
8283

84+
#[inline]
8385
pub fn set_broker_name(&mut self, broker_name: CheetahString) {
8486
self.broker_name = broker_name;
8587
}
@@ -89,6 +91,7 @@ impl MessageQueue {
8991
self.queue_id
9092
}
9193

94+
#[inline]
9295
pub fn set_queue_id(&mut self, queue_id: i32) {
9396
self.queue_id = queue_id;
9497
}

0 commit comments

Comments
 (0)