@@ -21,10 +21,14 @@ use cheetah_string::CheetahString;
2121use rocketmq_common:: common:: message:: message_client_ext:: MessageClientExt ;
2222use rocketmq_common:: common:: message:: message_ext:: MessageExt ;
2323use rocketmq_common:: common:: message:: message_queue:: MessageQueue ;
24+ use rocketmq_common:: common:: mix_all;
25+ use rocketmq_common:: MessageAccessor :: MessageAccessor ;
26+ use rocketmq_common:: TimeUtils :: get_current_millis;
2427use rocketmq_remoting:: protocol:: body:: cm_result:: CMResult ;
2528use rocketmq_remoting:: protocol:: body:: consume_message_directly_result:: ConsumeMessageDirectlyResult ;
2629use rocketmq_rust:: ArcMut ;
2730use tracing:: info;
31+ use tracing:: warn;
2832
2933use crate :: base:: client_config:: ClientConfig ;
3034use crate :: consumer:: consumer_impl:: consume_message_service:: ConsumeMessageServiceTrait ;
@@ -34,7 +38,9 @@ use crate::consumer::consumer_impl::process_queue::ProcessQueue;
3438use crate :: consumer:: default_mq_push_consumer:: ConsumerConfig ;
3539use crate :: consumer:: listener:: consume_concurrently_context:: ConsumeConcurrentlyContext ;
3640use crate :: consumer:: listener:: consume_concurrently_status:: ConsumeConcurrentlyStatus ;
41+ use crate :: consumer:: listener:: consume_return_type:: ConsumeReturnType ;
3742use crate :: consumer:: listener:: message_listener_concurrently:: ArcBoxMessageListenerConcurrently ;
43+ use crate :: hook:: consume_message_context:: ConsumeMessageContext ;
3844
3945pub struct ConsumeMessagePopConcurrentlyService {
4046 pub ( crate ) default_mqpush_consumer_impl : Option < ArcMut < DefaultMQPushConsumerImpl > > ,
@@ -146,7 +152,9 @@ impl ConsumeMessageServiceTrait for ConsumeMessagePopConcurrentlyService {
146152 message_queue : MessageQueue ,
147153 dispatch_to_consume : bool ,
148154 ) {
149- todo ! ( )
155+ unimplemented ! (
156+ "ConsumeMessagePopConcurrentlyService.submit_consume_request is not supported"
157+ )
150158 }
151159
152160 async fn submit_pop_consume_request (
@@ -158,3 +166,173 @@ impl ConsumeMessageServiceTrait for ConsumeMessagePopConcurrentlyService {
158166 todo ! ( )
159167 }
160168}
169+
170+ impl ConsumeMessagePopConcurrentlyService {
171+ async fn process_consume_result (
172+ & mut self ,
173+ this : ArcMut < Self > ,
174+ status : ConsumeConcurrentlyStatus ,
175+ context : & ConsumeConcurrentlyContext ,
176+ consume_request : & mut ConsumeRequest ,
177+ ) {
178+ unimplemented ! ( "ConsumeMessagePopConcurrentlyService.process_consume_result" )
179+ }
180+ }
181+
182+ struct ConsumeRequest {
183+ msgs : Vec < ArcMut < MessageClientExt > > ,
184+ process_queue : Arc < PopProcessQueue > ,
185+ message_queue : MessageQueue ,
186+ pop_time : u64 ,
187+ invisible_time : u64 ,
188+ consumer_group : CheetahString ,
189+ message_listener : ArcBoxMessageListenerConcurrently ,
190+ default_mqpush_consumer_impl : Option < ArcMut < DefaultMQPushConsumerImpl > > ,
191+ }
192+
193+ impl ConsumeRequest {
194+ pub fn new (
195+ msgs : Vec < ArcMut < MessageClientExt > > ,
196+ process_queue : Arc < PopProcessQueue > ,
197+ message_queue : MessageQueue ,
198+ pop_time : u64 ,
199+ invisible_time : u64 ,
200+ ) -> Self {
201+ unimplemented ! ( )
202+ }
203+
204+ #[ inline]
205+ pub fn is_pop_timeout ( & self ) -> bool {
206+ if self . msgs . is_empty ( ) || self . pop_time == 0 || self . invisible_time == 0 {
207+ return true ;
208+ }
209+ get_current_millis ( ) . saturating_sub ( self . pop_time ) >= self . invisible_time
210+ }
211+
212+ pub async fn run (
213+ & mut self ,
214+ mut consume_message_concurrently_service : ArcMut < ConsumeMessagePopConcurrentlyService > ,
215+ ) {
216+ if self . process_queue . is_dropped ( ) {
217+ info ! (
218+ "the message queue not be able to consume, because it's dropped(pop). group={} {}" ,
219+ self . consumer_group, self . message_queue
220+ ) ;
221+ return ;
222+ }
223+ if self . is_pop_timeout ( ) {
224+ info ! (
225+ "the pop message time out so abort consume. popTime={} invisibleTime={}, group={} \
226+ {}",
227+ self . pop_time, self . invisible_time, self . consumer_group, self . message_queue
228+ ) ;
229+ self . process_queue . dec_found_msg ( self . msgs . len ( ) ) ;
230+ return ;
231+ }
232+ let context = ConsumeConcurrentlyContext {
233+ message_queue : self . message_queue . clone ( ) ,
234+ delay_level_when_next_consume : 0 ,
235+ ack_index : i32:: MAX ,
236+ } ;
237+
238+ let mut default_mqpush_consumer_impl =
239+ self . default_mqpush_consumer_impl . as_ref ( ) . unwrap ( ) . clone ( ) ;
240+ default_mqpush_consumer_impl
241+ . reset_retry_and_namespace ( & mut self . msgs , self . consumer_group . as_str ( ) ) ;
242+ let mut consume_message_context = None ;
243+
244+ let begin_timestamp = Instant :: now ( ) ;
245+ let mut has_exception = false ;
246+ let mut return_type = ConsumeReturnType :: Success ;
247+ let mut status = None ;
248+
249+ if !self . msgs . is_empty ( ) {
250+ for msg in self . msgs . iter_mut ( ) {
251+ MessageAccessor :: set_consume_start_time_stamp (
252+ & mut msg. message_ext_inner ,
253+ CheetahString :: from_string ( get_current_millis ( ) . to_string ( ) ) ,
254+ ) ;
255+ }
256+ }
257+
258+ if default_mqpush_consumer_impl. has_hook ( ) {
259+ let queue = self . message_queue . clone ( ) ;
260+ consume_message_context = Some ( ConsumeMessageContext {
261+ consumer_group : self . consumer_group . clone ( ) ,
262+ msg_list : & self . msgs ,
263+ mq : Some ( queue) ,
264+ success : false ,
265+ status : CheetahString :: new ( ) ,
266+ mq_trace_context : None ,
267+ props : Default :: default ( ) ,
268+ namespace : default_mqpush_consumer_impl
269+ . client_config
270+ . get_namespace ( )
271+ . unwrap_or_default ( ) ,
272+ access_channel : Default :: default ( ) ,
273+ } ) ;
274+ default_mqpush_consumer_impl. execute_hook_before ( & mut consume_message_context) ;
275+ }
276+ let vec = self
277+ . msgs
278+ . iter ( )
279+ . map ( |msg| & msg. message_ext_inner )
280+ . collect :: < Vec < & MessageExt > > ( ) ;
281+ match self . message_listener . consume_message ( & vec, & context) {
282+ Ok ( value) => {
283+ status = Some ( value) ;
284+ }
285+ Err ( _) => {
286+ has_exception = true ;
287+ }
288+ }
289+ let consume_rt = begin_timestamp. elapsed ( ) . as_millis ( ) as u64 ;
290+ if status. is_none ( ) {
291+ if has_exception {
292+ return_type = ConsumeReturnType :: Exception ;
293+ } else {
294+ return_type = ConsumeReturnType :: ReturnNull ;
295+ }
296+ } else if consume_rt
297+ > default_mqpush_consumer_impl. consumer_config . consume_timeout * 60 * 1000
298+ {
299+ return_type = ConsumeReturnType :: TimeOut ;
300+ } else if status. unwrap ( ) == ConsumeConcurrentlyStatus :: ReconsumeLater {
301+ return_type = ConsumeReturnType :: Failed ;
302+ } else if status. unwrap ( ) == ConsumeConcurrentlyStatus :: ConsumeSuccess {
303+ return_type = ConsumeReturnType :: Success ;
304+ }
305+
306+ if default_mqpush_consumer_impl. has_hook ( ) {
307+ consume_message_context. as_mut ( ) . unwrap ( ) . props . insert (
308+ CheetahString :: from_static_str ( mix_all:: CONSUME_CONTEXT_TYPE ) ,
309+ return_type. to_string ( ) . into ( ) ,
310+ ) ;
311+ }
312+
313+ if status. is_none ( ) {
314+ status = Some ( ConsumeConcurrentlyStatus :: ReconsumeLater ) ;
315+ }
316+
317+ if default_mqpush_consumer_impl. has_hook ( ) {
318+ let cmc = consume_message_context. as_mut ( ) . unwrap ( ) ;
319+ cmc. status = status. unwrap ( ) . to_string ( ) . into ( ) ;
320+ cmc. success = status. unwrap ( ) == ConsumeConcurrentlyStatus :: ConsumeSuccess ;
321+ cmc. access_channel = Some ( default_mqpush_consumer_impl. client_config . access_channel ) ;
322+ default_mqpush_consumer_impl. execute_hook_after ( & mut consume_message_context) ;
323+ }
324+
325+ if self . process_queue . is_dropped ( ) {
326+ warn ! (
327+ "the message queue not be able to consume, because it's dropped. group={} {}" ,
328+ self . consumer_group, self . message_queue,
329+ ) ;
330+ } else {
331+ let this = consume_message_concurrently_service. clone ( ) ;
332+
333+ consume_message_concurrently_service
334+ . process_consume_result ( this, status. unwrap ( ) , & context, self )
335+ . await ;
336+ }
337+ }
338+ }
0 commit comments