Skip to content

Commit 35e564c

Browse files
committed
[ISSUE #1522]♻️Refactor UpdateConsumerOffsetRequestHeader with derive macro RequestHeaderCodec
1 parent b4b6c59 commit 35e564c

3 files changed

Lines changed: 152 additions & 31 deletions

File tree

rocketmq-broker/src/processor/consumer_manage_processor.rs

Lines changed: 19 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -202,41 +202,41 @@ where
202202
);
203203
}
204204

205-
if queue_id.is_none() {
206-
return Some(
207-
response
208-
.set_code(ResponseCode::SystemError)
209-
.set_remark(format!("QueueId is null, topic is {}", topic)),
210-
);
211-
}
212-
if offset.is_none() {
213-
return Some(
214-
response
215-
.set_code(ResponseCode::SystemError)
216-
.set_remark(format!("Offset is null, topic is {}", topic)),
217-
);
218-
}
205+
// if queue_id.is_none() {
206+
// return Some(
207+
// response
208+
// .set_code(ResponseCode::SystemError)
209+
// .set_remark(format!("QueueId is null, topic is {}", topic)),
210+
// );
211+
// }
212+
// if offset.is_none() {
213+
// return Some(
214+
// response
215+
// .set_code(ResponseCode::SystemError)
216+
// .set_remark(format!("Offset is null, topic is {}", topic)),
217+
// );
218+
// }
219219
if self.broker_config.use_server_side_reset_offset
220220
&& self
221221
.consumer_offset_manager
222-
.has_offset_reset(topic, group, queue_id.unwrap())
222+
.has_offset_reset(topic, group, queue_id)
223223
{
224224
info!(
225225
"Update consumer offset is rejected because of previous offset-reset. \
226226
Group={},Topic={}, QueueId={}, Offset={}",
227227
topic,
228228
group,
229-
queue_id.unwrap(),
230-
offset.unwrap()
229+
queue_id,
230+
offset
231231
);
232232
return Some(response.set_remark("Offset has been previously reset"));
233233
}
234234
self.consumer_offset_manager.commit_offset(
235235
channel.remote_address(),
236236
group,
237237
topic,
238-
queue_id.unwrap(),
239-
offset.unwrap(),
238+
queue_id,
239+
offset,
240240
);
241241
Some(response)
242242
}

rocketmq-client/src/consumer/store/remote_broker_offset_store.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -282,8 +282,8 @@ impl OffsetStoreTrait for RemoteBrokerOffsetStore {
282282
let request_header = UpdateConsumerOffsetRequestHeader {
283283
consumer_group: self.group_name.clone(),
284284
topic: mq.get_topic_cs().clone(),
285-
queue_id: Some(mq.get_queue_id()),
286-
commit_offset: Some(offset),
285+
queue_id: mq.get_queue_id(),
286+
commit_offset: offset,
287287
topic_request_header: Some(TopicRequestHeader {
288288
lo: None,
289289
rpc: Some(RpcRequestHeader {

rocketmq-remoting/src/protocol/header/update_consumer_offset_header.rs

Lines changed: 131 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -14,32 +14,33 @@
1414
* See the License for the specific language governing permissions and
1515
* limitations under the License.
1616
*/
17-
use std::collections::HashMap;
1817

1918
use cheetah_string::CheetahString;
2019
use rocketmq_macros::RequestHeaderCodec;
2120
use serde::Deserialize;
2221
use serde::Serialize;
2322

24-
use crate::protocol::command_custom_header::CommandCustomHeader;
25-
use crate::protocol::command_custom_header::FromMap;
2623
use crate::protocol::header::message_operation_header::TopicRequestHeaderTrait;
2724
use crate::protocol::header::namesrv::topic_operation_header::TopicRequestHeader;
2825

2926
#[derive(Debug, Serialize, Deserialize, Default, RequestHeaderCodec)]
3027
pub struct UpdateConsumerOffsetResponseHeader {}
3128

32-
#[derive(Debug, Serialize, Deserialize, Default)]
29+
#[derive(Debug, Serialize, Deserialize, Default, RequestHeaderCodec)]
3330
#[serde(rename_all = "camelCase")]
3431
pub struct UpdateConsumerOffsetRequestHeader {
32+
#[required]
3533
pub consumer_group: CheetahString,
34+
#[required]
3635
pub topic: CheetahString,
37-
pub queue_id: Option<i32>,
38-
pub commit_offset: Option<i64>,
36+
#[required]
37+
pub queue_id: i32,
38+
#[required]
39+
pub commit_offset: i64,
3940
#[serde(flatten)]
4041
pub topic_request_header: Option<TopicRequestHeader>,
4142
}
42-
impl UpdateConsumerOffsetRequestHeader {
43+
/*impl UpdateConsumerOffsetRequestHeader {
4344
pub const CONSUMER_GROUP: &'static str = "consumerGroup";
4445
pub const TOPIC: &'static str = "topic";
4546
pub const QUEUE_ID: &'static str = "queueId";
@@ -111,7 +112,7 @@ impl FromMap for UpdateConsumerOffsetRequestHeader {
111112
topic_request_header: Some(<TopicRequestHeader as FromMap>::from(map)?),
112113
})
113114
}
114-
}
115+
}*/
115116

116117
impl TopicRequestHeaderTrait for UpdateConsumerOffsetRequestHeader {
117118
fn set_lo(&mut self, lo: Option<bool>) {
@@ -213,10 +214,130 @@ impl TopicRequestHeaderTrait for UpdateConsumerOffsetRequestHeader {
213214
}
214215

215216
fn queue_id(&self) -> Option<i32> {
216-
self.queue_id
217+
Some(self.queue_id)
217218
}
218219

219220
fn set_queue_id(&mut self, queue_id: Option<i32>) {
220-
self.queue_id = queue_id;
221+
self.queue_id = queue_id.unwrap();
222+
}
223+
}
224+
225+
#[cfg(test)]
226+
mod tests {
227+
use std::collections::HashMap;
228+
229+
use cheetah_string::CheetahString;
230+
231+
use super::*;
232+
use crate::protocol::command_custom_header::CommandCustomHeader;
233+
use crate::protocol::command_custom_header::FromMap;
234+
235+
#[test]
236+
fn update_consumer_offset_request_header_serializes_correctly() {
237+
let header = UpdateConsumerOffsetRequestHeader {
238+
consumer_group: CheetahString::from_static_str("test_consumer_group"),
239+
topic: CheetahString::from_static_str("test_topic"),
240+
queue_id: 1,
241+
commit_offset: 100,
242+
topic_request_header: None,
243+
};
244+
let map = header.to_map().unwrap();
245+
assert_eq!(
246+
map.get(&CheetahString::from_static_str("consumerGroup"))
247+
.unwrap(),
248+
"test_consumer_group"
249+
);
250+
assert_eq!(
251+
map.get(&CheetahString::from_static_str("topic")).unwrap(),
252+
"test_topic"
253+
);
254+
assert_eq!(
255+
map.get(&CheetahString::from_static_str("queueId")).unwrap(),
256+
"1"
257+
);
258+
assert_eq!(
259+
map.get(&CheetahString::from_static_str("commitOffset"))
260+
.unwrap(),
261+
"100"
262+
);
263+
}
264+
265+
#[test]
266+
fn update_consumer_offset_request_header_deserializes_correctly() {
267+
let mut map = HashMap::new();
268+
map.insert(
269+
CheetahString::from_static_str("consumerGroup"),
270+
CheetahString::from_static_str("test_consumer_group"),
271+
);
272+
map.insert(
273+
CheetahString::from_static_str("topic"),
274+
CheetahString::from_static_str("test_topic"),
275+
);
276+
map.insert(
277+
CheetahString::from_static_str("queueId"),
278+
CheetahString::from_static_str("1"),
279+
);
280+
map.insert(
281+
CheetahString::from_static_str("commitOffset"),
282+
CheetahString::from_static_str("100"),
283+
);
284+
285+
let header = <UpdateConsumerOffsetRequestHeader as FromMap>::from(&map).unwrap();
286+
assert_eq!(header.consumer_group, "test_consumer_group");
287+
assert_eq!(header.topic, "test_topic");
288+
assert_eq!(header.queue_id, 1);
289+
assert_eq!(header.commit_offset, 100);
290+
}
291+
292+
#[test]
293+
fn update_consumer_offset_request_header_handles_missing_optional_fields() {
294+
let mut map = HashMap::new();
295+
map.insert(
296+
CheetahString::from_static_str("consumerGroup"),
297+
CheetahString::from_static_str("test_consumer_group"),
298+
);
299+
map.insert(
300+
CheetahString::from_static_str("topic"),
301+
CheetahString::from_static_str("test_topic"),
302+
);
303+
map.insert(
304+
CheetahString::from_static_str("queueId"),
305+
CheetahString::from_static_str("1"),
306+
);
307+
map.insert(
308+
CheetahString::from_static_str("commitOffset"),
309+
CheetahString::from_static_str("100"),
310+
);
311+
312+
let header = <UpdateConsumerOffsetRequestHeader as FromMap>::from(&map).unwrap();
313+
assert_eq!(header.consumer_group, "test_consumer_group");
314+
assert_eq!(header.topic, "test_topic");
315+
assert_eq!(header.queue_id, 1);
316+
assert_eq!(header.commit_offset, 100);
317+
assert!(header.topic_request_header.is_some());
318+
}
319+
320+
#[test]
321+
fn update_consumer_offset_request_header_handles_invalid_data() {
322+
let mut map = HashMap::new();
323+
map.insert(
324+
CheetahString::from_static_str("consumerGroup"),
325+
CheetahString::from_static_str("test_consumer_group"),
326+
);
327+
map.insert(
328+
CheetahString::from_static_str("topic"),
329+
CheetahString::from_static_str("test_topic"),
330+
);
331+
map.insert(
332+
CheetahString::from_static_str("queueId"),
333+
CheetahString::from_static_str("invalid"),
334+
);
335+
map.insert(
336+
CheetahString::from_static_str("commitOffset"),
337+
CheetahString::from_static_str("invalid"),
338+
);
339+
340+
let result = <UpdateConsumerOffsetRequestHeader as FromMap>::from(&map);
341+
assert!(result.is_err());
221342
}
222343
}

0 commit comments

Comments
 (0)