@@ -30,7 +30,6 @@ use rocketmq_common::TimeUtils::get_current_millis;
3030use rocketmq_common:: WeakCellWrapper ;
3131use rocketmq_remoting:: protocol:: body:: consume_message_directly_result:: ConsumeMessageDirectlyResult ;
3232use rocketmq_remoting:: protocol:: heartbeat:: message_model:: MessageModel ;
33- use rocketmq_runtime:: RocketMQRuntime ;
3433use tracing:: info;
3534use tracing:: warn;
3635
@@ -53,7 +52,7 @@ pub struct ConsumeMessageConcurrentlyService {
5352 pub ( crate ) consumer_config : ArcRefCellWrapper < ConsumerConfig > ,
5453 pub ( crate ) consumer_group : Arc < String > ,
5554 pub ( crate ) message_listener : ArcBoxMessageListenerConcurrently ,
56- pub ( crate ) consume_runtime : Arc < RocketMQRuntime > ,
55+ // pub(crate) consume_runtime: Arc<RocketMQRuntime>,
5756}
5857
5958impl ConsumeMessageConcurrentlyService {
@@ -71,10 +70,10 @@ impl ConsumeMessageConcurrentlyService {
7170 consumer_config,
7271 consumer_group : Arc :: new ( consumer_group) ,
7372 message_listener,
74- consume_runtime : Arc :: new ( RocketMQRuntime :: new_multi (
73+ /* consume_runtime: Arc::new(RocketMQRuntime::new_multi(
7574 consume_thread as usize,
7675 "ConsumeMessageThread_",
77- ) ) ,
76+ )),*/
7877 }
7978 }
8079}
@@ -140,12 +139,6 @@ impl ConsumeMessageConcurrentlyService {
140139 }
141140 if !msg_back_failed. is_empty ( ) {
142141 consume_request. msgs . append ( & mut msg_back_success) ;
143- /* let msg_back_failed_switched = msg_back_failed
144- .into_iter()
145- .map(|msg| MessageClientExt {
146- message_ext_inner: msg,
147- })
148- .collect();*/
149142 self . submit_consume_request_later (
150143 msg_back_failed,
151144 consume_request. process_queue . clone ( ) ,
@@ -182,7 +175,7 @@ impl ConsumeMessageConcurrentlyService {
182175 message_queue : MessageQueue ,
183176 ) {
184177 let this = self . clone ( ) ;
185- self . consume_runtime . get_handle ( ) . spawn ( async move {
178+ tokio :: spawn ( async move {
186179 tokio:: time:: sleep ( Duration :: from_secs ( 5 ) ) . await ;
187180 this. submit_consume_request ( msgs, process_queue, message_queue, true )
188181 . await ;
@@ -231,8 +224,8 @@ impl ConsumeMessageServiceTrait for ConsumeMessageConcurrentlyService {
231224 } ) ;
232225 }
233226
234- fn shutdown ( & self , await_terminate_millis : u64 ) {
235- todo ! ( )
227+ fn shutdown ( & mut self , await_terminate_millis : u64 ) {
228+ // todo!()
236229 }
237230
238231 fn update_core_pool_size ( & self , core_pool_size : usize ) {
@@ -267,7 +260,7 @@ impl ConsumeMessageServiceTrait for ConsumeMessageConcurrentlyService {
267260 dispatch_to_consume : bool ,
268261 ) {
269262 let consume_batch_size = self . consumer_config . consume_message_batch_max_size ;
270- if msgs. len ( ) < consume_batch_size as usize {
263+ if msgs. len ( ) <= consume_batch_size as usize {
271264 let mut consume_request = ConsumeRequest {
272265 msgs : msgs. clone ( ) ,
273266 message_listener : self . message_listener . clone ( ) ,
@@ -278,7 +271,8 @@ impl ConsumeMessageServiceTrait for ConsumeMessageConcurrentlyService {
278271 default_mqpush_consumer_impl : self . default_mqpush_consumer_impl . clone ( ) ,
279272 } ;
280273 let consume_message_concurrently_service = self . clone ( ) ;
281- self . consume_runtime . get_handle ( ) . spawn ( async move {
274+
275+ tokio:: spawn ( async move {
282276 consume_request
283277 . run ( consume_message_concurrently_service)
284278 . await
@@ -301,7 +295,12 @@ impl ConsumeMessageServiceTrait for ConsumeMessageConcurrentlyService {
301295 default_mqpush_consumer_impl : self . default_mqpush_consumer_impl . clone ( ) ,
302296 } ;
303297 let consume_message_concurrently_service = self . clone ( ) ;
304- self . consume_runtime . get_handle ( ) . spawn ( async move {
298+ /* self.consume_runtime.get_handle().spawn(async move {
299+ consume_request
300+ .run(consume_message_concurrently_service)
301+ .await
302+ });*/
303+ tokio:: spawn ( async move {
305304 consume_request
306305 . run ( consume_message_concurrently_service)
307306 . await
0 commit comments