1414 * See the License for the specific language governing permissions and
1515 * limitations under the License.
1616 */
17+ use std:: cell:: SyncUnsafeCell ;
1718use std:: collections:: HashMap ;
1819use std:: sync:: atomic:: AtomicI32 ;
1920use std:: sync:: atomic:: Ordering ;
2021use std:: sync:: Arc ;
2122use std:: sync:: Once ;
2223use std:: sync:: RwLock ;
2324
25+ use bytes:: BufMut ;
2426use bytes:: Bytes ;
27+ use bytes:: BytesMut ;
2528use lazy_static:: lazy_static;
2629use rocketmq_common:: common:: mq_version:: RocketMqVersion ;
2730use serde:: Deserialize ;
2831use serde:: Serialize ;
32+ use tracing:: error;
2933
3034use super :: FastCodesHeader ;
3135use super :: RemotingCommandType ;
@@ -87,7 +91,8 @@ pub struct RemotingCommand {
8791 #[ serde( skip) ]
8892 suspended : bool ,
8993 #[ serde( skip) ]
90- command_custom_header : Option < Arc < dyn CommandCustomHeader + Send + Sync + ' static > > ,
94+ command_custom_header :
95+ Option < Arc < SyncUnsafeCell < dyn CommandCustomHeader + Send + Sync + ' static > > > ,
9196 #[ serde( rename = "serializeTypeCurrentRPC" ) ]
9297 serialize_type : SerializeType ,
9398}
@@ -135,10 +140,10 @@ impl RemotingCommand {
135140}
136141
137142impl RemotingCommand {
138- pub fn create_request_command (
139- code : impl Into < i32 > ,
140- header : impl CommandCustomHeader + Sync + Send + ' static ,
141- ) -> Self {
143+ pub fn create_request_command < T > ( code : impl Into < i32 > , header : T ) -> Self
144+ where
145+ T : CommandCustomHeader + Sync + Send + ' static ,
146+ {
142147 let mut command = Self :: default ( )
143148 . set_code ( code. into ( ) )
144149 . set_command_custom_header ( header) ;
@@ -188,13 +193,12 @@ impl RemotingCommand {
188193 . mark_response_type ( )
189194 }
190195
191- pub fn set_command_custom_header (
192- mut self ,
193- //command_custom_header: Option<Box<dyn CommandCustomHeader + Send + 'static>>,
194- command_custom_header : impl CommandCustomHeader + Send + Sync + ' static ,
195- ) -> Self {
196- self . command_custom_header = Some ( Arc :: new ( command_custom_header) ) ;
197- if let Some ( cch) = & self . command_custom_header {
196+ pub fn set_command_custom_header < T > ( mut self , command_custom_header : T ) -> Self
197+ where
198+ T : CommandCustomHeader + Sync + Send + ' static ,
199+ {
200+ self . command_custom_header = Some ( Arc :: new ( SyncUnsafeCell :: new ( command_custom_header) ) ) ;
201+ /*if let Some(cch) = &self.command_custom_header {
198202 let option = cch.to_map();
199203
200204 match &mut self.ext_fields {
@@ -209,16 +213,16 @@ impl RemotingCommand {
209213 }
210214 }
211215 }
212- }
216+ }*/
213217 self
214218 }
215219
216- pub fn set_command_custom_header_ref (
217- & mut self ,
218- command_custom_header : impl CommandCustomHeader + Send + Sync + ' static ,
219- ) {
220- self . command_custom_header = Some ( Arc :: new ( command_custom_header) ) ;
221- if let Some ( cch) = & self . command_custom_header {
220+ pub fn set_command_custom_header_ref < T > ( & mut self , command_custom_header : T )
221+ where
222+ T : CommandCustomHeader + Sync + Send + ' static ,
223+ {
224+ self . command_custom_header = Some ( Arc :: new ( SyncUnsafeCell :: new ( command_custom_header) ) ) ;
225+ /* if let Some(cch) = &self.command_custom_header {
222226 let option = cch.to_map();
223227
224228 match &mut self.ext_fields {
@@ -233,7 +237,7 @@ impl RemotingCommand {
233237 }
234238 }
235239 }
236- }
240+ }*/
237241 }
238242
239243 pub fn set_code ( mut self , code : impl Into < i32 > ) -> Self {
@@ -322,20 +326,89 @@ impl RemotingCommand {
322326 }
323327
324328 pub fn header_encode ( & self ) -> Option < Bytes > {
325- self . command_custom_header . as_ref ( ) . and_then ( |cch| {
326- cch. to_map ( )
329+ self . command_custom_header . as_ref ( ) . and_then ( |header| {
330+ let header_ptr = header. get ( ) ;
331+ let header_ref = unsafe { & * ( header_ptr as * const dyn CommandCustomHeader ) } ;
332+ header_ref
333+ . to_map ( )
327334 . as_ref ( )
328335 . map ( |val| Bytes :: from ( serde_json:: to_vec ( val) . unwrap ( ) ) )
329336 } )
330337 }
331338
332- pub fn fast_header_encode ( & self ) -> Option < Bytes > {
339+ pub fn make_custom_header_to_net ( & mut self ) {
340+ if let Some ( header) = & self . command_custom_header {
341+ let header_ptr = header. get ( ) ;
342+ let header_ref = unsafe { & * ( header_ptr as * const dyn CommandCustomHeader ) } ;
343+ let option = header_ref. to_map ( ) ;
344+
345+ match & mut self . ext_fields {
346+ None => {
347+ self . ext_fields = option;
348+ }
349+ Some ( ext) => {
350+ if let Some ( val) = option {
351+ for ( key, value) in & val {
352+ ext. insert ( key. clone ( ) , value. clone ( ) ) ;
353+ }
354+ }
355+ }
356+ }
357+ }
358+ }
359+
360+ /* pub fn fast_header_encode(&self) -> Option<Bytes> {
333361 let st = serde_json::to_string(self).unwrap();
334362 Some(Bytes::from(st))
363+ }*/
364+
365+ pub fn fast_header_encode ( & mut self , dst : & mut BytesMut ) {
366+ match self . serialize_type {
367+ SerializeType :: JSON => {
368+ self . make_custom_header_to_net ( ) ;
369+ let header = match serde_json:: to_vec ( self ) {
370+ Ok ( value) => Some ( value) ,
371+ Err ( e) => {
372+ error ! ( "Failed to encode generic: {}" , e) ;
373+ None
374+ }
375+ } ;
376+ let header_length = header. as_ref ( ) . map_or ( 0 , |h| h. len ( ) ) as i32 ;
377+ let body_length = self . body . as_ref ( ) . map_or ( 0 , |b| b. len ( ) ) as i32 ;
378+ let total_length = 4 + header_length + body_length;
379+
380+ dst. reserve ( ( total_length + 4 ) as usize ) ;
381+ dst. put_i32 ( total_length) ;
382+ let serialize_type =
383+ RemotingCommand :: mark_serialize_type ( header_length, SerializeType :: JSON ) ;
384+ dst. put_i32 ( serialize_type) ;
385+
386+ if let Some ( header_inner) = header {
387+ dst. put ( header_inner. as_slice ( ) ) ;
388+ }
389+ }
390+ SerializeType :: ROCKETMQ => { }
391+ }
392+
393+ /*let header_length = header.as_ref().map_or(0, |h| h.len()) as i32;
394+ let body_length = self.body.as_ref().map_or(0, |b| b.len()) as i32;
395+ let total_length = 4 + header_length + body_length;
396+
397+ dst.reserve((total_length + 4) as usize);
398+ dst.put_i32(total_length);
399+ let serialize_type =
400+ RemotingCommand::mark_serialize_type(header_length, item.get_serialize_type());
401+ dst.put_i32(serialize_type);
402+
403+ if let Some(header_inner) = header {
404+ dst.put(header_inner);
405+ }
406+
407+ let st = serde_json::to_string(self).unwrap();*/
335408 }
336409
337- pub fn get_body ( & self ) -> Option < Bytes > {
338- self . body . as_ref ( ) . cloned ( )
410+ pub fn get_body ( & self ) -> Option < & Bytes > {
411+ self . body . as_ref ( )
339412 }
340413
341414 pub fn mark_serialize_type ( header_length : i32 , protocol_type : SerializeType ) -> i32 {
@@ -453,6 +526,34 @@ impl RemotingCommand {
453526 pub fn get_ext_fields ( & self ) -> Option < & HashMap < String , String > > {
454527 self . ext_fields . as_ref ( )
455528 }
529+
530+ pub fn read_custom_header < T > ( & mut self ) -> Option < & T >
531+ where
532+ T : CommandCustomHeader + Sync + Send ,
533+ {
534+ match self . command_custom_header . as_ref ( ) {
535+ None => None ,
536+ Some ( value) => {
537+ let value = value. get ( ) ;
538+ let value = value as * const dyn CommandCustomHeader as * const T ;
539+ unsafe { Some ( & * value) }
540+ }
541+ }
542+ }
543+
544+ pub fn read_custom_header_mut < T > ( & mut self ) -> Option < & mut T >
545+ where
546+ T : CommandCustomHeader + Sync + Send ,
547+ {
548+ match self . command_custom_header . as_ref ( ) {
549+ None => None ,
550+ Some ( value) => {
551+ let value = value. get ( ) ;
552+ let value = value as * const dyn CommandCustomHeader as * mut T ;
553+ unsafe { Some ( & mut * value) }
554+ }
555+ }
556+ }
456557}
457558
458559impl AsRef < RemotingCommand > for RemotingCommand {
0 commit comments