Skip to content

Commit fb328dd

Browse files
committed
[ISSUE #1089]🔥Optimize api according to rust api-guidelines🎨
1 parent fe0ccb3 commit fb328dd

15 files changed

Lines changed: 64 additions & 65 deletions

rocketmq-broker/src/out_api/broker_outer_api.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -117,7 +117,7 @@ impl BrokerOuterAPI {
117117
let topic_route_body = topic_route_data.encode();
118118

119119
RemotingCommand::create_request_command(RequestCode::RegisterTopicInNamesrv, request_header)
120-
.set_body(Some(topic_route_body))
120+
.set_body(topic_route_body)
121121
}
122122
}
123123

@@ -248,7 +248,7 @@ impl BrokerOuterAPI {
248248
);
249249
let request =
250250
RemotingCommand::create_request_command(RequestCode::RegisterBroker, request_header)
251-
.set_body(Some(body.clone()));
251+
.set_body(body.clone());
252252
if oneway {
253253
self.remoting_client
254254
.invoke_oneway(namesrv_addr, request, timeout_mills)
@@ -338,7 +338,7 @@ impl BrokerOuterAPI {
338338
RequestCode::LockBatchMq,
339339
LockBatchMqRequestHeader::default(),
340340
);
341-
request.set_body_mut_ref(Some(request_body));
341+
request.set_body_mut_ref(request_body);
342342
let result = self
343343
.remoting_client
344344
.invoke_async(Some(addr), request, timeout_millis)
@@ -371,7 +371,7 @@ impl BrokerOuterAPI {
371371
RequestCode::UnlockBatchMq,
372372
UnlockBatchMqRequestHeader::default(),
373373
);
374-
request.set_body_mut_ref(Some(request_body));
374+
request.set_body_mut_ref(request_body);
375375
let result = self
376376
.remoting_client
377377
.invoke_async(Some(addr), request, timeout_millis)

rocketmq-broker/src/processor/admin_broker_processor/batch_mq_handler.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -115,7 +115,7 @@ impl BatchMqHandler {
115115
let response_body = LockBatchResponseBody {
116116
lock_ok_mq_set: lock_ok_mqset,
117117
};
118-
Some(RemotingCommand::create_response_command().set_body(Some(response_body.encode())))
118+
Some(RemotingCommand::create_response_command().set_body(response_body.encode()))
119119
}
120120

121121
pub async fn unlock_batch_mq(

rocketmq-broker/src/processor/admin_broker_processor/broker_config_request_handler.rs

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,6 @@
1717

1818
use std::collections::HashMap;
1919

20-
use bytes::Bytes;
2120
use rocketmq_common::common::mix_all;
2221
use rocketmq_common::common::mq_version::RocketMqVersion;
2322
use rocketmq_remoting::code::request_code::RequestCode;
@@ -78,7 +77,7 @@ impl BrokerConfigRequestHandler {
7877
body.push_str(&format!("{}:{}\n", key, value));
7978
}
8079
if !body.is_empty() {
81-
response.set_body_mut_ref(Some(Bytes::from(body)));
80+
response.set_body_mut_ref(body);
8281
}
8382
Some(response)
8483
}
@@ -95,9 +94,7 @@ impl BrokerConfigRequestHandler {
9594
let key_value_table = KVTable {
9695
table: runtime_info,
9796
};
98-
response.set_body_mut_ref(Some(Bytes::from(
99-
serde_json::to_string(&key_value_table).unwrap(),
100-
)));
97+
response.set_body_mut_ref(serde_json::to_string(&key_value_table).unwrap());
10198
Some(response)
10299
}
103100

rocketmq-broker/src/processor/admin_broker_processor/consumer_request_handler.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -84,7 +84,7 @@ impl ConsumerRequestHandler {
8484
body_data.get_connection_set().insert(connection);
8585
}
8686
let body = body_data.encode();
87-
response.set_body_mut_ref(Some(body));
87+
response.set_body_mut_ref(body);
8888
Some(response)
8989
}
9090
None => Some(
@@ -214,7 +214,7 @@ impl ConsumerRequestHandler {
214214
consume_stats.set_consume_tps(new_consume_tps);
215215
}
216216
let body = consume_stats.encode();
217-
response.set_body_mut_ref(Some(body));
217+
response.set_body_mut_ref(body);
218218
Some(response)
219219
}
220220

@@ -228,7 +228,7 @@ impl ConsumerRequestHandler {
228228
let mut response = RemotingCommand::create_response_command();
229229
let content = self.inner.consumer_offset_manager.encode();
230230
if !content.is_empty() {
231-
response.set_body_mut_ref(Some(content));
231+
response.set_body_mut_ref(content);
232232
Some(response)
233233
} else {
234234
Some(

rocketmq-broker/src/processor/admin_broker_processor/topic_request_handler.rs

Lines changed: 6 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,6 @@
1818
use std::collections::HashMap;
1919
use std::collections::HashSet;
2020

21-
use bytes::Bytes;
2221
use rocketmq_common::common::attribute::attribute_parser::AttributeParser;
2322
use rocketmq_common::common::attribute::topic_message_type::TopicMessageType;
2423
use rocketmq_common::common::config::TopicConfig;
@@ -398,7 +397,7 @@ impl TopicRequestHandler {
398397
};
399398
let content = topic_config_and_mapping_serialize_wrapper.to_json();
400399
if !content.is_empty() {
401-
response.set_body_mut_ref(Some(Bytes::from(content)));
400+
response.set_body_mut_ref(content);
402401
}
403402
Some(response)
404403
}
@@ -416,7 +415,7 @@ impl TopicRequestHandler {
416415
topic_list: topics.into_iter().map(|s| s.to_string()).collect(),
417416
broker_addr: None,
418417
};
419-
response.set_body_mut_ref(Some(topic_list.encode()));
418+
response.set_body_mut_ref(topic_list.encode());
420419
Some(response)
421420
}
422421

@@ -477,7 +476,7 @@ impl TopicRequestHandler {
477476
map.insert(message_queue, topic_offset);
478477
}
479478
topic_stats_table.set_offset_table(map);
480-
response.set_body_mut_ref(Some(topic_stats_table.encode()));
479+
response.set_body_mut_ref(topic_stats_table.encode());
481480
Some(response)
482481
}
483482

@@ -517,7 +516,7 @@ impl TopicRequestHandler {
517516
}
518517
let topic_config_and_queue_mapping =
519518
TopicConfigAndQueueMapping::new(topic_config.unwrap(), topic_queue_mapping_detail);
520-
response.set_body_mut_ref(Some(topic_config_and_queue_mapping.encode()));
519+
response.set_body_mut_ref(topic_config_and_queue_mapping.encode());
521520
Some(response)
522521
}
523522

@@ -540,7 +539,7 @@ impl TopicRequestHandler {
540539
.which_group_by_topic(topic);
541540
groups.extend(group_in_offset.clone());
542541
let group_list = GroupList { group_list: groups };
543-
response.set_body_mut_ref(Some(group_list.encode()));
542+
response.set_body_mut_ref(group_list.encode());
544543
Some(response)
545544
}
546545

@@ -567,7 +566,7 @@ impl TopicRequestHandler {
567566
topic_list: topics.into_iter().collect(),
568567
broker_addr: Some(broker_addr),
569568
};
570-
response.set_body_mut_ref(Some(topic_list.encode()));
569+
response.set_body_mut_ref(topic_list.encode());
571570
Some(response)
572571
}
573572

rocketmq-broker/src/processor/consumer_manage_processor.rs

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,6 @@
1717

1818
use std::sync::Arc;
1919

20-
use bytes::Bytes;
2120
use rocketmq_common::common::broker::broker_config::BrokerConfig;
2221
use rocketmq_remoting::code::request_code::RequestCode;
2322
use rocketmq_remoting::code::response_code::ResponseCode;
@@ -137,7 +136,7 @@ where
137136
};
138137
return Some(
139138
response
140-
.set_body(Some(Bytes::from(body.encode())))
139+
.set_body(body.encode())
141140
.set_code(ResponseCode::Success),
142141
);
143142
} else {

rocketmq-broker/src/processor/default_pull_message_result_handler.rs

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -199,7 +199,10 @@ impl PullMessageResultHandler for DefaultPullMessageResultHandler {
199199
request_header.topic.as_str(),
200200
request_header.queue_id.unwrap(),
201201
);
202-
Some(response.set_body(body))
202+
if let Some(body) = body {
203+
response.set_body_mut_ref(body);
204+
}
205+
Some(response)
203206
} else {
204207
None
205208
}

rocketmq-broker/src/processor/query_message_processor.rs

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -99,7 +99,10 @@ where
9999

100100
if query_message_result.buffer_total_size > 0 {
101101
let message_data = query_message_result.get_message_data();
102-
return Some(response.set_body(message_data));
102+
if let Some(body) = message_data {
103+
response.set_body_mut_ref(body);
104+
}
105+
return Some(response);
103106
}
104107
Some(
105108
response
@@ -116,7 +119,7 @@ where
116119
_ctx: ConnectionHandlerContext,
117120
request: RemotingCommand,
118121
) -> Option<RemotingCommand> {
119-
let response = RemotingCommand::create_response_command();
122+
let mut response = RemotingCommand::create_response_command();
120123
let request_header = request
121124
.decode_command_custom_header::<ViewMessageRequestHeader>()
122125
.unwrap();
@@ -126,7 +129,10 @@ where
126129
.await;
127130
if let Some(result) = select_mapped_buffer_result {
128131
let message_data = result.get_bytes();
129-
return Some(response.set_body(message_data));
132+
if let Some(body) = message_data {
133+
response.set_body_mut_ref(body)
134+
}
135+
return Some(response);
130136
}
131137
Some(
132138
response

rocketmq-broker/src/processor/reply_message_processor.rs

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -370,11 +370,13 @@ impl<MS: MessageStore> ReplyMessageProcessor<MS> {
370370
unit_mode: request_header.unit_mode,
371371
..Default::default()
372372
};
373-
let command = RemotingCommand::create_request_command(
373+
let mut command = RemotingCommand::create_request_command(
374374
RequestCode::PushReplyMessageToClient,
375375
reply_message_request_header,
376-
)
377-
.set_body(msg.get_body().cloned());
376+
);
377+
if let Some(body) = msg.get_body().cloned() {
378+
command.set_body_mut_ref(body);
379+
}
378380
let sender_id = msg.get_property(MessageConst::PROPERTY_MESSAGE_REPLY_TO_CLIENT);
379381
let mut push_reply_result = PushReplyResult(false, "".to_string());
380382
if let Some(sender_id) = sender_id {

rocketmq-client/src/implementation/mq_client_api_impl.rs

Lines changed: 6 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@ use std::sync::atomic::AtomicU32;
1919
use std::sync::Arc;
2020
use std::time::Instant;
2121

22-
use bytes::Bytes;
2322
use lazy_static::lazy_static;
2423
use rocketmq_common::common::message::message_batch::MessageBatch;
2524
use rocketmq_common::common::message::message_client_id_setter::MessageClientIDSetter;
@@ -319,9 +318,9 @@ impl MQClientAPIImpl {
319318
// if compressed_body is not None, set request body to compressed_body
320319
if msg.get_compressed_body_mut().is_some() {
321320
let compressed_body = std::mem::take(msg.get_compressed_body_mut());
322-
request.set_body_mut_ref(compressed_body);
321+
request.set_body_mut_ref(compressed_body.unwrap());
323322
} else {
324-
request.set_body_mut_ref(msg.get_body().cloned());
323+
request.set_body_mut_ref(msg.get_body().cloned().unwrap());
325324
}
326325
match communication_mode {
327326
CommunicationMode::Sync => {
@@ -657,7 +656,7 @@ impl MQClientAPIImpl {
657656
HeartbeatRequestHeader::default(),
658657
)
659658
.set_language(self.client_config.language)
660-
.set_body(Some(Bytes::from(heartbeat_data.encode())));
659+
.set_body(heartbeat_data.encode());
661660
let response = self
662661
.remoting_client
663662
.invoke_async(Some(addr.to_string()), request, timeout_millis)
@@ -686,7 +685,7 @@ impl MQClientAPIImpl {
686685
consumer_group.to_string(),
687686
subscription_data.clone(),
688687
);
689-
request.set_body_mut_ref(Some(body.encode()));
688+
request.set_body_mut_ref(body.encode());
690689
let response = self
691690
.remoting_client
692691
.invoke_async(
@@ -1059,7 +1058,7 @@ impl MQClientAPIImpl {
10591058
RequestCode::UnlockBatchMq,
10601059
UnlockBatchMqRequestHeader::default(),
10611060
);
1062-
request.set_body_mut_ref(Some(request_body.encode()));
1061+
request.set_body_mut_ref(request_body.encode());
10631062
if oneway {
10641063
self.remoting_client
10651064
.invoke_oneway(addr.to_string(), request, timeout_millis)
@@ -1099,7 +1098,7 @@ impl MQClientAPIImpl {
10991098
RequestCode::LockBatchMq,
11001099
LockBatchMqRequestHeader::default(),
11011100
);
1102-
request.set_body_mut_ref(Some(request_body.encode()));
1101+
request.set_body_mut_ref(request_body.encode());
11031102
let response = self
11041103
.remoting_client
11051104
.invoke_async(

0 commit comments

Comments
 (0)