@@ -19,6 +19,7 @@ use std::net::SocketAddr;
1919use bytes:: Bytes ;
2020use cheetah_string:: CheetahString ;
2121use rocketmq_common:: common:: compression:: compressor_factory:: CompressorFactory ;
22+ use rocketmq_common:: common:: message:: message_decoder;
2223use rocketmq_common:: common:: message:: message_ext:: MessageExt ;
2324use rocketmq_common:: common:: message:: MessageConst ;
2425use rocketmq_common:: common:: message:: MessageTrait ;
@@ -30,10 +31,13 @@ use rocketmq_remoting::code::request_code::RequestCode;
3031use rocketmq_remoting:: code:: response_code:: ResponseCode ;
3132use rocketmq_remoting:: net:: channel:: Channel ;
3233use rocketmq_remoting:: protocol:: header:: check_transaction_state_request_header:: CheckTransactionStateRequestHeader ;
34+ use rocketmq_remoting:: protocol:: header:: consume_message_directly_result_request_header:: ConsumeMessageDirectlyResultRequestHeader ;
3335use rocketmq_remoting:: protocol:: header:: notify_consumer_ids_changed_request_header:: NotifyConsumerIdsChangedRequestHeader ;
3436use rocketmq_remoting:: protocol:: header:: reply_message_request_header:: ReplyMessageRequestHeader ;
3537use rocketmq_remoting:: protocol:: namespace_util:: NamespaceUtil ;
3638use rocketmq_remoting:: protocol:: remoting_command:: RemotingCommand ;
39+ use rocketmq_remoting:: protocol:: RemotingSerializable ;
40+ use rocketmq_remoting:: remoting_error:: RemotingError :: RemotingCommandError ;
3741use rocketmq_remoting:: runtime:: connection_handler_context:: ConnectionHandlerContext ;
3842use rocketmq_remoting:: runtime:: processor:: RequestProcessor ;
3943use rocketmq_remoting:: Result ;
@@ -79,7 +83,7 @@ impl RequestProcessor for ClientRemotingProcessor {
7983 unimplemented ! ( "GetConsumerRunningInfo" )
8084 }
8185 RequestCode :: ConsumeMessageDirectly => {
82- unimplemented ! ( "ConsumeMessageDirectly" )
86+ self . consume_message_directly ( channel , ctx , request ) . await
8387 }
8488 //RPC message handle code
8589 RequestCode :: PushReplyMessageToClient => self . receive_reply_message ( ctx, request) . await ,
@@ -272,4 +276,52 @@ impl ClientRemotingProcessor {
272276 } ;
273277 Ok ( None )
274278 }
279+
280+ async fn consume_message_directly (
281+ & mut self ,
282+ channel : Channel ,
283+ ctx : ConnectionHandlerContext ,
284+ mut request : RemotingCommand ,
285+ ) -> Result < Option < RemotingCommand > > {
286+ let request_header =
287+ request. decode_command_custom_header :: < ConsumeMessageDirectlyResultRequestHeader > ( ) ?;
288+ let body = request
289+ . get_body_mut ( )
290+ . ok_or ( RemotingCommandError ( "body is empty" . to_string ( ) ) ) ?;
291+ let msg = message_decoder:: decode ( body, true , true , false , false , false )
292+ . ok_or ( RemotingCommandError ( "decode message failed" . to_string ( ) ) ) ?;
293+
294+ if let Some ( client_instance) = self . client_instance . upgrade ( ) {
295+ let result = client_instance
296+ . consume_message_directly (
297+ msg,
298+ & request_header. consumer_group ,
299+ request_header. broker_name . clone ( ) ,
300+ )
301+ . await ;
302+ if let Some ( result) = result {
303+ let body = result
304+ . encode ( )
305+ . map_err ( |_| RemotingCommandError ( "encode result failed" . to_string ( ) ) ) ?;
306+ Ok ( Some (
307+ RemotingCommand :: create_response_command ( ) . set_body ( body) ,
308+ ) )
309+ } else {
310+ warn ! ( "consumeMessageDirectly, consume message failed" ) ;
311+ Ok ( Some (
312+ RemotingCommand :: create_response_command_with_code ( ResponseCode :: SystemError )
313+ . set_remark ( format ! (
314+ "The Consumer Group <{}> not exist in this consumer" ,
315+ request_header. consumer_group
316+ ) ) ,
317+ ) )
318+ }
319+ } else {
320+ warn ! ( "consumeMessageDirectly, client_instance is empty" ) ;
321+ Ok ( Some (
322+ RemotingCommand :: create_response_command_with_code ( ResponseCode :: SystemError )
323+ . set_remark ( "client_instance is empty" ) ,
324+ ) )
325+ }
326+ }
275327}
0 commit comments