Skip to content

Commit c4aa8ff

Browse files
authored
[ISSUE #1570]🚀Add ConsumeMessageDirectlyResultRequestHeader struct🔥 (#1571)
1 parent 6f2ab13 commit c4aa8ff

2 files changed

Lines changed: 114 additions & 0 deletions

File tree

rocketmq-remoting/src/protocol/header.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
pub mod broker;
1818
pub mod check_transaction_state_request_header;
1919
pub mod client_request_header;
20+
pub mod consume_message_directly_result_request_header;
2021
pub mod consumer_send_msg_back_request_header;
2122
pub mod create_topic_request_header;
2223
pub mod delete_topic_request_header;
Lines changed: 113 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,113 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
use cheetah_string::CheetahString;
18+
use rocketmq_macros::RequestHeaderCodec;
19+
use serde::Deserialize;
20+
use serde::Serialize;
21+
22+
use crate::rpc::rpc_request_header::RpcRequestHeader;
23+
24+
#[derive(Clone, Debug, Serialize, Deserialize, Default, RequestHeaderCodec)]
25+
#[serde(rename_all = "camelCase")]
26+
pub struct ConsumeMessageDirectlyResultRequestHeader {
27+
#[required]
28+
pub consumer_group: CheetahString,
29+
pub client_id: Option<CheetahString>,
30+
pub msg_id: Option<CheetahString>,
31+
pub broker_name: Option<CheetahString>,
32+
pub topic: Option<CheetahString>,
33+
pub topic_sys_flag: Option<i32>,
34+
pub group_sys_flag: Option<i32>,
35+
#[serde(flatten)]
36+
pub rpc_request_header: Option<RpcRequestHeader>,
37+
}
38+
39+
#[cfg(test)]
40+
mod tests {
41+
use cheetah_string::CheetahString;
42+
43+
use super::*;
44+
45+
#[test]
46+
fn consume_message_directly_result_request_header_serializes_correctly() {
47+
let header = ConsumeMessageDirectlyResultRequestHeader {
48+
consumer_group: CheetahString::from_static_str("test_group"),
49+
client_id: Some(CheetahString::from_static_str("client_id")),
50+
msg_id: Some(CheetahString::from_static_str("msg_id")),
51+
broker_name: Some(CheetahString::from_static_str("broker_name")),
52+
topic: Some(CheetahString::from_static_str("topic")),
53+
topic_sys_flag: Some(1),
54+
group_sys_flag: Some(2),
55+
rpc_request_header: None,
56+
};
57+
let serialized = serde_json::to_string(&header).unwrap();
58+
let expected = r#"{"consumerGroup":"test_group","clientId":"client_id","msgId":"msg_id","brokerName":"broker_name","topic":"topic","topicSysFlag":1,"groupSysFlag":2}"#;
59+
assert_eq!(serialized, expected);
60+
}
61+
62+
#[test]
63+
fn consume_message_directly_result_request_header_deserializes_correctly() {
64+
let data = r#"{"consumerGroup":"test_group","clientId":"client_id","msgId":"msg_id","brokerName":"broker_name","topic":"topic","topicSysFlag":1,"groupSysFlag":2}"#;
65+
let header: ConsumeMessageDirectlyResultRequestHeader = serde_json::from_str(data).unwrap();
66+
assert_eq!(
67+
header.consumer_group,
68+
CheetahString::from_static_str("test_group")
69+
);
70+
assert_eq!(
71+
header.client_id.unwrap(),
72+
CheetahString::from_static_str("client_id")
73+
);
74+
assert_eq!(
75+
header.msg_id.unwrap(),
76+
CheetahString::from_static_str("msg_id")
77+
);
78+
assert_eq!(
79+
header.broker_name.unwrap(),
80+
CheetahString::from_static_str("broker_name")
81+
);
82+
assert_eq!(
83+
header.topic.unwrap(),
84+
CheetahString::from_static_str("topic")
85+
);
86+
assert_eq!(header.topic_sys_flag.unwrap(), 1);
87+
assert_eq!(header.group_sys_flag.unwrap(), 2);
88+
}
89+
90+
#[test]
91+
fn consume_message_directly_result_request_header_handles_missing_optional_fields() {
92+
let data = r#"{"consumerGroup":"test_group"}"#;
93+
let header: ConsumeMessageDirectlyResultRequestHeader = serde_json::from_str(data).unwrap();
94+
assert_eq!(
95+
header.consumer_group,
96+
CheetahString::from_static_str("test_group")
97+
);
98+
assert!(header.client_id.is_none());
99+
assert!(header.msg_id.is_none());
100+
assert!(header.broker_name.is_none());
101+
assert!(header.topic.is_none());
102+
assert!(header.topic_sys_flag.is_none());
103+
assert!(header.group_sys_flag.is_none());
104+
}
105+
106+
#[test]
107+
fn consume_message_directly_result_request_header_handles_invalid_data() {
108+
let data = r#"{"consumerGroup":12345}"#;
109+
let result: Result<ConsumeMessageDirectlyResultRequestHeader, _> =
110+
serde_json::from_str(data);
111+
assert!(result.is_err());
112+
}
113+
}

0 commit comments

Comments
 (0)