Skip to content

Commit 86d94d9

Browse files
committed
[ISSUE #1648]🚀Add ConsumeRequest for ConsumeMessagePopConcurrentlyService🔥
1 parent cc274b0 commit 86d94d9

1 file changed

Lines changed: 179 additions & 1 deletion

File tree

rocketmq-client/src/consumer/consumer_impl/consume_message_pop_concurrently_service.rs

Lines changed: 179 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,10 +21,14 @@ use cheetah_string::CheetahString;
2121
use rocketmq_common::common::message::message_client_ext::MessageClientExt;
2222
use rocketmq_common::common::message::message_ext::MessageExt;
2323
use rocketmq_common::common::message::message_queue::MessageQueue;
24+
use rocketmq_common::common::mix_all;
25+
use rocketmq_common::MessageAccessor::MessageAccessor;
26+
use rocketmq_common::TimeUtils::get_current_millis;
2427
use rocketmq_remoting::protocol::body::cm_result::CMResult;
2528
use rocketmq_remoting::protocol::body::consume_message_directly_result::ConsumeMessageDirectlyResult;
2629
use rocketmq_rust::ArcMut;
2730
use tracing::info;
31+
use tracing::warn;
2832

2933
use crate::base::client_config::ClientConfig;
3034
use crate::consumer::consumer_impl::consume_message_service::ConsumeMessageServiceTrait;
@@ -34,7 +38,9 @@ use crate::consumer::consumer_impl::process_queue::ProcessQueue;
3438
use crate::consumer::default_mq_push_consumer::ConsumerConfig;
3539
use crate::consumer::listener::consume_concurrently_context::ConsumeConcurrentlyContext;
3640
use crate::consumer::listener::consume_concurrently_status::ConsumeConcurrentlyStatus;
41+
use crate::consumer::listener::consume_return_type::ConsumeReturnType;
3742
use crate::consumer::listener::message_listener_concurrently::ArcBoxMessageListenerConcurrently;
43+
use crate::hook::consume_message_context::ConsumeMessageContext;
3844

3945
pub struct ConsumeMessagePopConcurrentlyService {
4046
pub(crate) default_mqpush_consumer_impl: Option<ArcMut<DefaultMQPushConsumerImpl>>,
@@ -146,7 +152,9 @@ impl ConsumeMessageServiceTrait for ConsumeMessagePopConcurrentlyService {
146152
message_queue: MessageQueue,
147153
dispatch_to_consume: bool,
148154
) {
149-
todo!()
155+
unimplemented!(
156+
"ConsumeMessagePopConcurrentlyService.submit_consume_request is not supported"
157+
)
150158
}
151159

152160
async fn submit_pop_consume_request(
@@ -158,3 +166,173 @@ impl ConsumeMessageServiceTrait for ConsumeMessagePopConcurrentlyService {
158166
todo!()
159167
}
160168
}
169+
170+
impl ConsumeMessagePopConcurrentlyService {
171+
async fn process_consume_result(
172+
&mut self,
173+
this: ArcMut<Self>,
174+
status: ConsumeConcurrentlyStatus,
175+
context: &ConsumeConcurrentlyContext,
176+
consume_request: &mut ConsumeRequest,
177+
) {
178+
unimplemented!("ConsumeMessagePopConcurrentlyService.process_consume_result")
179+
}
180+
}
181+
182+
struct ConsumeRequest {
183+
msgs: Vec<ArcMut<MessageClientExt>>,
184+
process_queue: Arc<PopProcessQueue>,
185+
message_queue: MessageQueue,
186+
pop_time: u64,
187+
invisible_time: u64,
188+
consumer_group: CheetahString,
189+
message_listener: ArcBoxMessageListenerConcurrently,
190+
default_mqpush_consumer_impl: Option<ArcMut<DefaultMQPushConsumerImpl>>,
191+
}
192+
193+
impl ConsumeRequest {
194+
pub fn new(
195+
msgs: Vec<ArcMut<MessageClientExt>>,
196+
process_queue: Arc<PopProcessQueue>,
197+
message_queue: MessageQueue,
198+
pop_time: u64,
199+
invisible_time: u64,
200+
) -> Self {
201+
unimplemented!()
202+
}
203+
204+
#[inline]
205+
pub fn is_pop_timeout(&self) -> bool {
206+
if self.msgs.is_empty() || self.pop_time == 0 || self.invisible_time == 0 {
207+
return true;
208+
}
209+
get_current_millis().saturating_sub(self.pop_time) >= self.invisible_time
210+
}
211+
212+
pub async fn run(
213+
&mut self,
214+
mut consume_message_concurrently_service: ArcMut<ConsumeMessagePopConcurrentlyService>,
215+
) {
216+
if self.process_queue.is_dropped() {
217+
info!(
218+
"the message queue not be able to consume, because it's dropped(pop). group={} {}",
219+
self.consumer_group, self.message_queue
220+
);
221+
return;
222+
}
223+
if self.is_pop_timeout() {
224+
info!(
225+
"the pop message time out so abort consume. popTime={} invisibleTime={}, group={} \
226+
{}",
227+
self.pop_time, self.invisible_time, self.consumer_group, self.message_queue
228+
);
229+
self.process_queue.dec_found_msg(self.msgs.len());
230+
return;
231+
}
232+
let context = ConsumeConcurrentlyContext {
233+
message_queue: self.message_queue.clone(),
234+
delay_level_when_next_consume: 0,
235+
ack_index: i32::MAX,
236+
};
237+
238+
let mut default_mqpush_consumer_impl =
239+
self.default_mqpush_consumer_impl.as_ref().unwrap().clone();
240+
default_mqpush_consumer_impl
241+
.reset_retry_and_namespace(&mut self.msgs, self.consumer_group.as_str());
242+
let mut consume_message_context = None;
243+
244+
let begin_timestamp = Instant::now();
245+
let mut has_exception = false;
246+
let mut return_type = ConsumeReturnType::Success;
247+
let mut status = None;
248+
249+
if !self.msgs.is_empty() {
250+
for msg in self.msgs.iter_mut() {
251+
MessageAccessor::set_consume_start_time_stamp(
252+
&mut msg.message_ext_inner,
253+
CheetahString::from_string(get_current_millis().to_string()),
254+
);
255+
}
256+
}
257+
258+
if default_mqpush_consumer_impl.has_hook() {
259+
let queue = self.message_queue.clone();
260+
consume_message_context = Some(ConsumeMessageContext {
261+
consumer_group: self.consumer_group.clone(),
262+
msg_list: &self.msgs,
263+
mq: Some(queue),
264+
success: false,
265+
status: CheetahString::new(),
266+
mq_trace_context: None,
267+
props: Default::default(),
268+
namespace: default_mqpush_consumer_impl
269+
.client_config
270+
.get_namespace()
271+
.unwrap_or_default(),
272+
access_channel: Default::default(),
273+
});
274+
default_mqpush_consumer_impl.execute_hook_before(&mut consume_message_context);
275+
}
276+
let vec = self
277+
.msgs
278+
.iter()
279+
.map(|msg| &msg.message_ext_inner)
280+
.collect::<Vec<&MessageExt>>();
281+
match self.message_listener.consume_message(&vec, &context) {
282+
Ok(value) => {
283+
status = Some(value);
284+
}
285+
Err(_) => {
286+
has_exception = true;
287+
}
288+
}
289+
let consume_rt = begin_timestamp.elapsed().as_millis() as u64;
290+
if status.is_none() {
291+
if has_exception {
292+
return_type = ConsumeReturnType::Exception;
293+
} else {
294+
return_type = ConsumeReturnType::ReturnNull;
295+
}
296+
} else if consume_rt
297+
> default_mqpush_consumer_impl.consumer_config.consume_timeout * 60 * 1000
298+
{
299+
return_type = ConsumeReturnType::TimeOut;
300+
} else if status.unwrap() == ConsumeConcurrentlyStatus::ReconsumeLater {
301+
return_type = ConsumeReturnType::Failed;
302+
} else if status.unwrap() == ConsumeConcurrentlyStatus::ConsumeSuccess {
303+
return_type = ConsumeReturnType::Success;
304+
}
305+
306+
if default_mqpush_consumer_impl.has_hook() {
307+
consume_message_context.as_mut().unwrap().props.insert(
308+
CheetahString::from_static_str(mix_all::CONSUME_CONTEXT_TYPE),
309+
return_type.to_string().into(),
310+
);
311+
}
312+
313+
if status.is_none() {
314+
status = Some(ConsumeConcurrentlyStatus::ReconsumeLater);
315+
}
316+
317+
if default_mqpush_consumer_impl.has_hook() {
318+
let cmc = consume_message_context.as_mut().unwrap();
319+
cmc.status = status.unwrap().to_string().into();
320+
cmc.success = status.unwrap() == ConsumeConcurrentlyStatus::ConsumeSuccess;
321+
cmc.access_channel = Some(default_mqpush_consumer_impl.client_config.access_channel);
322+
default_mqpush_consumer_impl.execute_hook_after(&mut consume_message_context);
323+
}
324+
325+
if self.process_queue.is_dropped() {
326+
warn!(
327+
"the message queue not be able to consume, because it's dropped. group={} {}",
328+
self.consumer_group, self.message_queue,
329+
);
330+
} else {
331+
let this = consume_message_concurrently_service.clone();
332+
333+
consume_message_concurrently_service
334+
.process_consume_result(this, status.unwrap(), &context, self)
335+
.await;
336+
}
337+
}
338+
}

0 commit comments

Comments
 (0)