Skip to content

Commit b4d273f

Browse files
committed
[ISSUE #750]✨Add client meta data struct
1 parent 823bca3 commit b4d273f

20 files changed

Lines changed: 934 additions & 383 deletions

rocketmq-common/src/common/message/message_queue.rs

Lines changed: 10 additions & 111 deletions
Original file line numberDiff line numberDiff line change
@@ -17,19 +17,16 @@
1717

1818
use std::cmp::Ordering;
1919
use std::fmt;
20-
use std::hash::Hash;
21-
use std::hash::Hasher;
2220

2321
use serde::Deserialize;
2422
use serde::Serialize;
2523

26-
#[derive(Clone, Debug)]
24+
#[derive(Debug, Clone, Hash, Serialize, Deserialize, Eq, Ord, PartialEq, PartialOrd)]
2725
pub struct MessageQueue {
2826
topic: String,
2927
broker_name: String,
3028
queue_id: i32,
3129
}
32-
3330
impl MessageQueue {
3431
pub fn new() -> Self {
3532
MessageQueue {
@@ -47,7 +44,7 @@ impl MessageQueue {
4744
}
4845
}
4946

50-
pub fn with_params(topic: &str, broker_name: &str, queue_id: i32) -> Self {
47+
pub fn from_parts(topic: &str, broker_name: &str, queue_id: i32) -> Self {
5148
MessageQueue {
5249
topic: topic.to_string(),
5350
broker_name: broker_name.to_string(),
@@ -59,16 +56,16 @@ impl MessageQueue {
5956
&self.topic
6057
}
6158

62-
pub fn set_topic(&mut self, topic: &str) {
63-
self.topic = topic.to_string();
59+
pub fn set_topic(&mut self, topic: String) {
60+
self.topic = topic;
6461
}
6562

6663
pub fn get_broker_name(&self) -> &str {
6764
&self.broker_name
6865
}
6966

70-
pub fn set_broker_name(&mut self, broker_name: &str) {
71-
self.broker_name = broker_name.to_string();
67+
pub fn set_broker_name(&mut self, broker_name: String) {
68+
self.broker_name = broker_name;
7269
}
7370

7471
pub fn get_queue_id(&self) -> i32 {
@@ -80,116 +77,18 @@ impl MessageQueue {
8077
}
8178
}
8279

83-
impl PartialEq for MessageQueue {
84-
fn eq(&self, other: &Self) -> bool {
85-
self.topic == other.topic
86-
&& self.broker_name == other.broker_name
87-
&& self.queue_id == other.queue_id
88-
}
89-
}
90-
91-
impl Eq for MessageQueue {}
92-
93-
impl Hash for MessageQueue {
94-
fn hash<H: Hasher>(&self, state: &mut H) {
95-
self.topic.hash(state);
96-
self.broker_name.hash(state);
97-
self.queue_id.hash(state);
98-
}
99-
}
100-
101-
impl Ord for MessageQueue {
102-
fn cmp(&self, other: &Self) -> Ordering {
103-
self.topic
104-
.cmp(&other.topic)
105-
.then_with(|| self.broker_name.cmp(&other.broker_name))
106-
.then_with(|| self.queue_id.cmp(&other.queue_id))
107-
}
108-
}
109-
110-
impl PartialOrd for MessageQueue {
111-
fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
112-
Some(self.cmp(other))
113-
}
114-
}
115-
11680
impl fmt::Display for MessageQueue {
11781
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
11882
write!(
11983
f,
120-
"MessageQueue [topic={}, broker_name={}, queue_id={}]",
84+
"MessageQueue [topic={}, brokerName={}, queueId={}]",
12185
self.topic, self.broker_name, self.queue_id
12286
)
12387
}
12488
}
12589

126-
#[cfg(test)]
127-
mod message_queue_tests {
128-
use super::*;
129-
130-
#[test]
131-
fn new_creates_empty_message_queue() {
132-
let mq = MessageQueue::new();
133-
assert_eq!(mq.topic, "");
134-
assert_eq!(mq.broker_name, "");
135-
assert_eq!(mq.queue_id, 0);
136-
}
137-
138-
#[test]
139-
fn from_other_copies_all_fields() {
140-
let original = MessageQueue::with_params("topic1", "broker1", 1);
141-
let copy = MessageQueue::from_other(&original);
142-
assert_eq!(copy.topic, "topic1");
143-
assert_eq!(copy.broker_name, "broker1");
144-
assert_eq!(copy.queue_id, 1);
145-
}
146-
147-
#[test]
148-
fn with_params_sets_all_fields() {
149-
let mq = MessageQueue::with_params("topic2", "broker2", 2);
150-
assert_eq!(mq.topic, "topic2");
151-
assert_eq!(mq.broker_name, "broker2");
152-
assert_eq!(mq.queue_id, 2);
153-
}
154-
155-
#[test]
156-
fn set_topic_updates_topic() {
157-
let mut mq = MessageQueue::new();
158-
mq.set_topic("new_topic");
159-
assert_eq!(mq.topic, "new_topic");
160-
}
161-
162-
#[test]
163-
fn set_broker_name_updates_broker_name() {
164-
let mut mq = MessageQueue::new();
165-
mq.set_broker_name("new_broker");
166-
assert_eq!(mq.broker_name, "new_broker");
167-
}
168-
169-
#[test]
170-
fn set_queue_id_updates_queue_id() {
171-
let mut mq = MessageQueue::new();
172-
mq.set_queue_id(3);
173-
assert_eq!(mq.queue_id, 3);
174-
}
175-
176-
#[test]
177-
fn equals_self() {
178-
let mq1 = MessageQueue::with_params("topic", "broker", 1);
179-
assert!(mq1.eq(&mq1));
180-
}
181-
182-
#[test]
183-
fn not_equal_different_topic() {
184-
let mq1 = MessageQueue::with_params("topic1", "broker", 1);
185-
let mq2 = MessageQueue::with_params("topic2", "broker", 1);
186-
assert!(!mq1.eq(&mq2));
187-
}
188-
189-
#[test]
190-
fn not_equal_different_broker_name() {
191-
let mq1 = MessageQueue::with_params("topic", "broker1", 1);
192-
let mq2 = MessageQueue::with_params("topic", "broker2", 1);
193-
assert!(!mq1.eq(&mq2));
90+
impl Default for MessageQueue {
91+
fn default() -> Self {
92+
MessageQueue::new()
19493
}
19594
}

rocketmq-namesrv/src/route/route_info_manager.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -349,9 +349,9 @@ impl RouteInfoManager {
349349

350350
impl RouteInfoManager {
351351
pub(crate) fn get_all_cluster_info(&self) -> ClusterInfo {
352-
ClusterInfo::new_with_var(
353-
self.broker_addr_table.clone(),
354-
self.cluster_addr_table.clone(),
352+
ClusterInfo::new(
353+
Some(self.broker_addr_table.clone()),
354+
Some(self.cluster_addr_table.clone()),
355355
)
356356
}
357357

rocketmq-remoting/src/protocol/body/broker_body/cluster_info.rs

Lines changed: 6 additions & 63 deletions
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,6 @@
1717

1818
use std::collections::HashMap;
1919
use std::collections::HashSet;
20-
use std::hash::Hash;
21-
use std::hash::Hasher;
2220

2321
use serde::Deserialize;
2422
use serde::Serialize;
@@ -28,75 +26,20 @@ use crate::protocol::route::route_data_view::BrokerData;
2826
#[derive(Debug, Clone, Serialize, Deserialize, Default)]
2927
pub struct ClusterInfo {
3028
#[serde(rename = "brokerAddrTable")]
31-
broker_addr_table: HashMap<String, BrokerData>,
29+
pub broker_addr_table: Option<HashMap<String, BrokerData>>,
3230

3331
#[serde(rename = "clusterAddrTable")]
34-
cluster_addr_table: HashMap<String, HashSet<String>>,
32+
pub cluster_addr_table: Option<HashMap<String, HashSet<String>>>,
3533
}
3634

3735
impl ClusterInfo {
38-
pub fn new() -> Self {
39-
ClusterInfo {
40-
broker_addr_table: HashMap::new(),
41-
cluster_addr_table: HashMap::new(),
42-
}
43-
}
44-
45-
pub fn new_with_var(
46-
broker_addr_table: HashMap<String, BrokerData>,
47-
cluster_addr_table: HashMap<String, HashSet<String>>,
48-
) -> Self {
36+
pub fn new(
37+
broker_addr_table: Option<HashMap<String, BrokerData>>,
38+
cluster_addr_table: Option<HashMap<String, HashSet<String>>>,
39+
) -> ClusterInfo {
4940
ClusterInfo {
5041
broker_addr_table,
5142
cluster_addr_table,
5243
}
5344
}
54-
55-
pub fn get_broker_addr_table(&self) -> &HashMap<String, BrokerData> {
56-
&self.broker_addr_table
57-
}
58-
59-
pub fn set_broker_addr_table(&mut self, broker_addr_table: HashMap<String, BrokerData>) {
60-
self.broker_addr_table = broker_addr_table;
61-
}
62-
63-
pub fn get_cluster_addr_table(&self) -> &HashMap<String, HashSet<String>> {
64-
&self.cluster_addr_table
65-
}
66-
67-
pub fn set_cluster_addr_table(&mut self, cluster_addr_table: HashMap<String, HashSet<String>>) {
68-
self.cluster_addr_table = cluster_addr_table;
69-
}
70-
71-
pub fn retrieve_all_addr_by_cluster(&self, cluster: &str) -> Vec<String> {
72-
let mut addrs = Vec::new();
73-
if let Some(broker_names) = self.cluster_addr_table.get(cluster) {
74-
for broker_name in broker_names {
75-
if let Some(broker_data) = self.broker_addr_table.get(broker_name) {
76-
addrs.extend(broker_data.broker_addrs().values().cloned());
77-
}
78-
}
79-
}
80-
addrs
81-
}
82-
83-
pub fn retrieve_all_cluster_names(&self) -> Vec<String> {
84-
self.cluster_addr_table.keys().cloned().collect()
85-
}
86-
}
87-
88-
impl PartialEq for ClusterInfo {
89-
fn eq(&self, other: &Self) -> bool {
90-
self.broker_addr_table == other.broker_addr_table
91-
&& self.cluster_addr_table == other.cluster_addr_table
92-
}
93-
}
94-
95-
impl Eq for ClusterInfo {}
96-
97-
impl Hash for ClusterInfo {
98-
fn hash<H: Hasher>(&self, state: &mut H) {
99-
self.broker_addr_table.hash(state);
100-
self.cluster_addr_table.hash(state);
101-
}
10245
}

rocketmq-remoting/src/protocol/header.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,11 +18,15 @@ pub mod broker;
1818
pub mod client_request_header;
1919
pub mod get_consumer_listby_group_request_header;
2020
pub mod get_consumer_listby_group_response_header;
21+
pub mod get_earliest_msg_storetime_response_header;
22+
pub mod get_max_offset_response_header;
23+
pub mod get_min_offset_response_header;
2124
pub mod message_operation_header;
2225
pub mod namesrv;
2326
pub mod pull_message_request_header;
2427
pub mod pull_message_response_header;
2528
pub mod query_consumer_offset_request_header;
2629
pub mod query_consumer_offset_response_header;
30+
pub mod search_offset_response_header;
2731
pub mod unregister_client_request_header;
2832
pub mod update_consumer_offset_header;
Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
use rocketmq_macros::RequestHeaderCodec;
18+
use serde::Deserialize;
19+
use serde::Serialize;
20+
21+
#[derive(Debug, Clone, Deserialize, Serialize, Default, RequestHeaderCodec)]
22+
pub struct GetEarliestMsgStoretimeResponseHeader {
23+
pub timestamp: i64,
24+
}
Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
use rocketmq_macros::RequestHeaderCodec;
18+
use serde::Deserialize;
19+
use serde::Serialize;
20+
21+
#[derive(Debug, Clone, Deserialize, Serialize, Default, RequestHeaderCodec)]
22+
pub struct GetMaxOffsetResponseHeader {
23+
pub offset: i64,
24+
}
Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
use rocketmq_macros::RequestHeaderCodec;
18+
use serde::Deserialize;
19+
use serde::Serialize;
20+
21+
#[derive(Debug, Clone, Deserialize, Serialize, Default, RequestHeaderCodec)]
22+
pub struct GetMinOffsetResponseHeader {
23+
pub offset: i64,
24+
}
Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
use rocketmq_macros::RequestHeaderCodec;
18+
use serde::Deserialize;
19+
use serde::Serialize;
20+
21+
#[derive(Debug, Clone, Deserialize, Serialize, Default, RequestHeaderCodec)]
22+
pub struct SearchOffsetResponseHeader {
23+
pub offset: i64,
24+
}

0 commit comments

Comments
 (0)