|
14 | 14 | * See the License for the specific language governing permissions and |
15 | 15 | * limitations under the License. |
16 | 16 | */ |
17 | | -use std::collections::HashMap; |
18 | 17 |
|
19 | 18 | use cheetah_string::CheetahString; |
20 | 19 | use rocketmq_macros::RequestHeaderCodec; |
21 | 20 | use serde::Deserialize; |
22 | 21 | use serde::Serialize; |
23 | 22 |
|
24 | | -use crate::protocol::command_custom_header::CommandCustomHeader; |
25 | | -use crate::protocol::command_custom_header::FromMap; |
26 | 23 | use crate::protocol::header::message_operation_header::TopicRequestHeaderTrait; |
27 | 24 | use crate::protocol::header::namesrv::topic_operation_header::TopicRequestHeader; |
28 | 25 |
|
29 | 26 | #[derive(Debug, Serialize, Deserialize, Default, RequestHeaderCodec)] |
30 | 27 | pub struct UpdateConsumerOffsetResponseHeader {} |
31 | 28 |
|
32 | | -#[derive(Debug, Serialize, Deserialize, Default)] |
| 29 | +#[derive(Debug, Serialize, Deserialize, Default, RequestHeaderCodec)] |
33 | 30 | #[serde(rename_all = "camelCase")] |
34 | 31 | pub struct UpdateConsumerOffsetRequestHeader { |
| 32 | + #[required] |
35 | 33 | pub consumer_group: CheetahString, |
| 34 | + #[required] |
36 | 35 | pub topic: CheetahString, |
37 | | - pub queue_id: Option<i32>, |
38 | | - pub commit_offset: Option<i64>, |
| 36 | + #[required] |
| 37 | + pub queue_id: i32, |
| 38 | + #[required] |
| 39 | + pub commit_offset: i64, |
39 | 40 | #[serde(flatten)] |
40 | 41 | pub topic_request_header: Option<TopicRequestHeader>, |
41 | 42 | } |
42 | | -impl UpdateConsumerOffsetRequestHeader { |
| 43 | +/*impl UpdateConsumerOffsetRequestHeader { |
43 | 44 | pub const CONSUMER_GROUP: &'static str = "consumerGroup"; |
44 | 45 | pub const TOPIC: &'static str = "topic"; |
45 | 46 | pub const QUEUE_ID: &'static str = "queueId"; |
@@ -111,7 +112,7 @@ impl FromMap for UpdateConsumerOffsetRequestHeader { |
111 | 112 | topic_request_header: Some(<TopicRequestHeader as FromMap>::from(map)?), |
112 | 113 | }) |
113 | 114 | } |
114 | | -} |
| 115 | +}*/ |
115 | 116 |
|
116 | 117 | impl TopicRequestHeaderTrait for UpdateConsumerOffsetRequestHeader { |
117 | 118 | fn set_lo(&mut self, lo: Option<bool>) { |
@@ -213,10 +214,130 @@ impl TopicRequestHeaderTrait for UpdateConsumerOffsetRequestHeader { |
213 | 214 | } |
214 | 215 |
|
215 | 216 | fn queue_id(&self) -> Option<i32> { |
216 | | - self.queue_id |
| 217 | + Some(self.queue_id) |
217 | 218 | } |
218 | 219 |
|
219 | 220 | fn set_queue_id(&mut self, queue_id: Option<i32>) { |
220 | | - self.queue_id = queue_id; |
| 221 | + self.queue_id = queue_id.unwrap(); |
| 222 | + } |
| 223 | +} |
| 224 | + |
| 225 | +#[cfg(test)] |
| 226 | +mod tests { |
| 227 | + use std::collections::HashMap; |
| 228 | + |
| 229 | + use cheetah_string::CheetahString; |
| 230 | + |
| 231 | + use super::*; |
| 232 | + use crate::protocol::command_custom_header::CommandCustomHeader; |
| 233 | + use crate::protocol::command_custom_header::FromMap; |
| 234 | + |
| 235 | + #[test] |
| 236 | + fn update_consumer_offset_request_header_serializes_correctly() { |
| 237 | + let header = UpdateConsumerOffsetRequestHeader { |
| 238 | + consumer_group: CheetahString::from_static_str("test_consumer_group"), |
| 239 | + topic: CheetahString::from_static_str("test_topic"), |
| 240 | + queue_id: 1, |
| 241 | + commit_offset: 100, |
| 242 | + topic_request_header: None, |
| 243 | + }; |
| 244 | + let map = header.to_map().unwrap(); |
| 245 | + assert_eq!( |
| 246 | + map.get(&CheetahString::from_static_str("consumerGroup")) |
| 247 | + .unwrap(), |
| 248 | + "test_consumer_group" |
| 249 | + ); |
| 250 | + assert_eq!( |
| 251 | + map.get(&CheetahString::from_static_str("topic")).unwrap(), |
| 252 | + "test_topic" |
| 253 | + ); |
| 254 | + assert_eq!( |
| 255 | + map.get(&CheetahString::from_static_str("queueId")).unwrap(), |
| 256 | + "1" |
| 257 | + ); |
| 258 | + assert_eq!( |
| 259 | + map.get(&CheetahString::from_static_str("commitOffset")) |
| 260 | + .unwrap(), |
| 261 | + "100" |
| 262 | + ); |
| 263 | + } |
| 264 | + |
| 265 | + #[test] |
| 266 | + fn update_consumer_offset_request_header_deserializes_correctly() { |
| 267 | + let mut map = HashMap::new(); |
| 268 | + map.insert( |
| 269 | + CheetahString::from_static_str("consumerGroup"), |
| 270 | + CheetahString::from_static_str("test_consumer_group"), |
| 271 | + ); |
| 272 | + map.insert( |
| 273 | + CheetahString::from_static_str("topic"), |
| 274 | + CheetahString::from_static_str("test_topic"), |
| 275 | + ); |
| 276 | + map.insert( |
| 277 | + CheetahString::from_static_str("queueId"), |
| 278 | + CheetahString::from_static_str("1"), |
| 279 | + ); |
| 280 | + map.insert( |
| 281 | + CheetahString::from_static_str("commitOffset"), |
| 282 | + CheetahString::from_static_str("100"), |
| 283 | + ); |
| 284 | + |
| 285 | + let header = <UpdateConsumerOffsetRequestHeader as FromMap>::from(&map).unwrap(); |
| 286 | + assert_eq!(header.consumer_group, "test_consumer_group"); |
| 287 | + assert_eq!(header.topic, "test_topic"); |
| 288 | + assert_eq!(header.queue_id, 1); |
| 289 | + assert_eq!(header.commit_offset, 100); |
| 290 | + } |
| 291 | + |
| 292 | + #[test] |
| 293 | + fn update_consumer_offset_request_header_handles_missing_optional_fields() { |
| 294 | + let mut map = HashMap::new(); |
| 295 | + map.insert( |
| 296 | + CheetahString::from_static_str("consumerGroup"), |
| 297 | + CheetahString::from_static_str("test_consumer_group"), |
| 298 | + ); |
| 299 | + map.insert( |
| 300 | + CheetahString::from_static_str("topic"), |
| 301 | + CheetahString::from_static_str("test_topic"), |
| 302 | + ); |
| 303 | + map.insert( |
| 304 | + CheetahString::from_static_str("queueId"), |
| 305 | + CheetahString::from_static_str("1"), |
| 306 | + ); |
| 307 | + map.insert( |
| 308 | + CheetahString::from_static_str("commitOffset"), |
| 309 | + CheetahString::from_static_str("100"), |
| 310 | + ); |
| 311 | + |
| 312 | + let header = <UpdateConsumerOffsetRequestHeader as FromMap>::from(&map).unwrap(); |
| 313 | + assert_eq!(header.consumer_group, "test_consumer_group"); |
| 314 | + assert_eq!(header.topic, "test_topic"); |
| 315 | + assert_eq!(header.queue_id, 1); |
| 316 | + assert_eq!(header.commit_offset, 100); |
| 317 | + assert!(header.topic_request_header.is_some()); |
| 318 | + } |
| 319 | + |
| 320 | + #[test] |
| 321 | + fn update_consumer_offset_request_header_handles_invalid_data() { |
| 322 | + let mut map = HashMap::new(); |
| 323 | + map.insert( |
| 324 | + CheetahString::from_static_str("consumerGroup"), |
| 325 | + CheetahString::from_static_str("test_consumer_group"), |
| 326 | + ); |
| 327 | + map.insert( |
| 328 | + CheetahString::from_static_str("topic"), |
| 329 | + CheetahString::from_static_str("test_topic"), |
| 330 | + ); |
| 331 | + map.insert( |
| 332 | + CheetahString::from_static_str("queueId"), |
| 333 | + CheetahString::from_static_str("invalid"), |
| 334 | + ); |
| 335 | + map.insert( |
| 336 | + CheetahString::from_static_str("commitOffset"), |
| 337 | + CheetahString::from_static_str("invalid"), |
| 338 | + ); |
| 339 | + |
| 340 | + let result = <UpdateConsumerOffsetRequestHeader as FromMap>::from(&map); |
| 341 | + assert!(result.is_err()); |
221 | 342 | } |
222 | 343 | } |
0 commit comments