@@ -19,7 +19,6 @@ use std::sync::atomic::AtomicU32;
1919use std:: sync:: Arc ;
2020use std:: time:: Instant ;
2121
22- use bytes:: Bytes ;
2322use lazy_static:: lazy_static;
2423use rocketmq_common:: common:: message:: message_batch:: MessageBatch ;
2524use rocketmq_common:: common:: message:: message_client_id_setter:: MessageClientIDSetter ;
@@ -319,9 +318,9 @@ impl MQClientAPIImpl {
319318 // if compressed_body is not None, set request body to compressed_body
320319 if msg. get_compressed_body_mut ( ) . is_some ( ) {
321320 let compressed_body = std:: mem:: take ( msg. get_compressed_body_mut ( ) ) ;
322- request. set_body_mut_ref ( compressed_body) ;
321+ request. set_body_mut_ref ( compressed_body. unwrap ( ) ) ;
323322 } else {
324- request. set_body_mut_ref ( msg. get_body ( ) . cloned ( ) ) ;
323+ request. set_body_mut_ref ( msg. get_body ( ) . cloned ( ) . unwrap ( ) ) ;
325324 }
326325 match communication_mode {
327326 CommunicationMode :: Sync => {
@@ -657,7 +656,7 @@ impl MQClientAPIImpl {
657656 HeartbeatRequestHeader :: default ( ) ,
658657 )
659658 . set_language ( self . client_config . language )
660- . set_body ( Some ( Bytes :: from ( heartbeat_data. encode ( ) ) ) ) ;
659+ . set_body ( heartbeat_data. encode ( ) ) ;
661660 let response = self
662661 . remoting_client
663662 . invoke_async ( Some ( addr. to_string ( ) ) , request, timeout_millis)
@@ -686,7 +685,7 @@ impl MQClientAPIImpl {
686685 consumer_group. to_string ( ) ,
687686 subscription_data. clone ( ) ,
688687 ) ;
689- request. set_body_mut_ref ( Some ( body. encode ( ) ) ) ;
688+ request. set_body_mut_ref ( body. encode ( ) ) ;
690689 let response = self
691690 . remoting_client
692691 . invoke_async (
@@ -1059,7 +1058,7 @@ impl MQClientAPIImpl {
10591058 RequestCode :: UnlockBatchMq ,
10601059 UnlockBatchMqRequestHeader :: default ( ) ,
10611060 ) ;
1062- request. set_body_mut_ref ( Some ( request_body. encode ( ) ) ) ;
1061+ request. set_body_mut_ref ( request_body. encode ( ) ) ;
10631062 if oneway {
10641063 self . remoting_client
10651064 . invoke_oneway ( addr. to_string ( ) , request, timeout_millis)
@@ -1099,7 +1098,7 @@ impl MQClientAPIImpl {
10991098 RequestCode :: LockBatchMq ,
11001099 LockBatchMqRequestHeader :: default ( ) ,
11011100 ) ;
1102- request. set_body_mut_ref ( Some ( request_body. encode ( ) ) ) ;
1101+ request. set_body_mut_ref ( request_body. encode ( ) ) ;
11031102 let response = self
11041103 . remoting_client
11051104 . invoke_async (
0 commit comments