Skip to content

Commit d829bf6

Browse files
authored
[ISSUE #735] 🔥Optimize pull message logic (#736)
1 parent e5330d1 commit d829bf6

17 files changed

Lines changed: 485 additions & 292 deletions

Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -65,3 +65,4 @@ trait-variant = "0.1.2"
6565
once_cell = "1.19.0"
6666

6767
mockall = "0.12.1"
68+
cfg-if = "1.0.0"

rocketmq-broker/Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,7 @@ dirs.workspace = true
5757
local-ip-address = "0.6.1"
5858
dns-lookup = "2.0"
5959
log = "0.4.22"
60-
60+
cfg-if = { workspace = true }
6161
[dev-dependencies]
6262
mockall = "0.12.1"
6363
static_assertions = { version = "1" }

rocketmq-broker/src/client/manager/consumer_manager.rs

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -141,6 +141,20 @@ impl ConsumerManager {
141141
.insert(topic.to_string(), subscription_data.clone());
142142
}
143143

144+
pub fn compensate_basic_consumer_info(
145+
&self,
146+
group: &str,
147+
consume_type: ConsumeType,
148+
message_model: MessageModel,
149+
) {
150+
let mut write_guard = self.consumer_compensation_table.write();
151+
let consumer_group_info = write_guard
152+
.entry(group.to_string())
153+
.or_insert_with(|| ConsumerGroupInfo::with_group_name(group.to_string()));
154+
consumer_group_info.set_consume_type(consume_type);
155+
consumer_group_info.set_message_model(message_model);
156+
}
157+
144158
pub fn register_consumer(
145159
&self,
146160
group: &str,

rocketmq-broker/src/coldctr/cold_data_cg_ctr_service.rs

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,4 +16,10 @@
1616
*/
1717

1818
#[derive(Default)]
19-
pub struct ColdDataCgCtrService {}
19+
pub struct ColdDataCgCtrService;
20+
21+
impl ColdDataCgCtrService {
22+
pub fn is_cg_need_cold_data_flow_ctr(&self, _consumer_group: &str) -> bool {
23+
false
24+
}
25+
}

rocketmq-broker/src/filter/expression_for_retry_message_filter.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -28,14 +28,14 @@ impl MessageFilter for ExpressionForRetryMessageFilter {
2828
tags_code: Option<i64>,
2929
cq_ext_unit: Option<&CqExtUnit>,
3030
) -> bool {
31-
todo!()
31+
true
3232
}
3333

3434
fn is_matched_by_commit_log(
3535
&self,
3636
msg_buffer: Option<&[u8]>,
3737
properties: Option<&HashMap<String, String>>,
3838
) -> bool {
39-
todo!()
39+
true
4040
}
4141
}

rocketmq-broker/src/filter/expression_message_filter.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -28,14 +28,14 @@ impl MessageFilter for ExpressionMessageFilter {
2828
tags_code: Option<i64>,
2929
cq_ext_unit: Option<&CqExtUnit>,
3030
) -> bool {
31-
todo!()
31+
true
3232
}
3333

3434
fn is_matched_by_commit_log(
3535
&self,
3636
msg_buffer: Option<&[u8]>,
3737
properties: Option<&HashMap<String, String>>,
3838
) -> bool {
39-
todo!()
39+
true
4040
}
4141
}

rocketmq-broker/src/processor/default_pull_message_result_handler.rs

Lines changed: 14 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ use bytes::BytesMut;
2323
use rocketmq_common::common::broker::broker_config::BrokerConfig;
2424
use rocketmq_common::common::mix_all::MASTER_ID;
2525
use rocketmq_common::common::sys_flag::pull_sys_flag::PullSysFlag;
26+
use rocketmq_common::TimeUtils::get_current_millis;
2627
use rocketmq_remoting::code::response_code::RemotingSysResponseCode;
2728
use rocketmq_remoting::code::response_code::ResponseCode;
2829
use rocketmq_remoting::net::channel::Channel;
@@ -44,6 +45,7 @@ use tracing::debug;
4445
use tracing::info;
4546

4647
use crate::client::manager::consumer_manager::ConsumerManager;
48+
use crate::long_polling::pull_request::PullRequest;
4749
use crate::mqtrace::consume_message_context::ConsumeMessageContext;
4850
use crate::mqtrace::consume_message_hook::ConsumeMessageHook;
4951
use crate::offset::manager::broadcast_offset_manager::BroadcastOffsetManager;
@@ -166,18 +168,20 @@ impl PullMessageResultHandler for DefaultPullMessageResultHandler {
166168
request_header.topic.as_str(),
167169
get_message_result.message_count(),
168170
);
171+
172+
ctx.upgrade()?;
173+
169174
if self.broker_config.transfer_msg_by_heap {
170175
let body = self.read_get_message_result(
171176
&get_message_result,
172177
request_header.consumer_group.as_str(),
173178
request_header.topic.as_str(),
174179
request_header.queue_id.unwrap(),
175180
);
176-
return Some(response.set_body(body));
177-
} /*else {
178-
None
179-
}*/
180-
None
181+
Some(response.set_body(body))
182+
} else {
183+
None
184+
}
181185
}
182186
ResponseCode::PullNotFound => {
183187
let has_suspend_flag =
@@ -196,17 +200,17 @@ impl PullMessageResultHandler for DefaultPullMessageResultHandler {
196200
let queue_id = request_header.queue_id.unwrap();
197201
let offset = request_header.queue_offset;
198202

199-
/* PullRequest::new(
203+
let pull_request = PullRequest::new(
200204
request,
201205
channel,
206+
ctx,
202207
polling_time_mills,
203-
begin_time_mills,
208+
get_current_millis(),
204209
offset,
205210
subscription_data,
206-
message_filter,
207-
);*/
211+
Arc::new(message_filter),
212+
);
208213
}
209-
210214
None
211215
}
212216
ResponseCode::PullOffsetMoved => Some(response),

rocketmq-broker/src/processor/pull_message_processor.rs

Lines changed: 88 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,7 @@ use tracing::warn;
5050

5151
use crate::client::consumer_group_info::ConsumerGroupInfo;
5252
use crate::client::manager::consumer_manager::ConsumerManager;
53+
use crate::coldctr::cold_data_cg_ctr_service::ColdDataCgCtrService;
5354
use crate::coldctr::cold_data_pull_request_hold_service::NO_SUSPEND_KEY;
5455
use crate::filter::expression_for_retry_message_filter::ExpressionForRetryMessageFilter;
5556
use crate::filter::expression_message_filter::ExpressionMessageFilter;
@@ -73,6 +74,7 @@ pub struct PullMessageProcessor<MS> {
7374
consumer_offset_manager: Arc<ConsumerOffsetManager>,
7475
broadcast_offset_manager: Arc<BroadcastOffsetManager>,
7576
message_store: Arc<MS>,
77+
cold_data_cg_ctr_service: Arc<ColdDataCgCtrService>,
7678
}
7779

7880
impl<MS> PullMessageProcessor<MS> {
@@ -99,6 +101,7 @@ impl<MS> PullMessageProcessor<MS> {
99101
consumer_offset_manager,
100102
broadcast_offset_manager,
101103
message_store,
104+
cold_data_cg_ctr_service: Arc::new(Default::default()),
102105
}
103106
}
104107

@@ -462,9 +465,17 @@ where
462465
);
463466
}
464467
match RequestSource::parse_integer(request_header.request_source) {
465-
RequestSource::ProxyForBroadcast => {}
466-
RequestSource::ProxyForStream => {}
467-
_ => {}
468+
RequestSource::ProxyForBroadcast => {
469+
unimplemented!("ProxyForBroadcast not implement")
470+
}
471+
RequestSource::ProxyForStream => {
472+
unimplemented!("ProxyForStream not implement")
473+
}
474+
_ => self.consumer_manager.compensate_basic_consumer_info(
475+
request_header.consumer_group.as_str(),
476+
ConsumeType::ConsumePassively,
477+
MessageModel::Clustering,
478+
),
468479
}
469480
let has_subscription_flag =
470481
PullSysFlag::has_subscription_flag(request_header.sys_flag as u32);
@@ -540,7 +551,7 @@ where
540551
);
541552
}
542553
let sgc_ref = subscription_group_config.as_ref().unwrap();
543-
if sgc_ref.consume_broadcast_enable()
554+
if !sgc_ref.consume_broadcast_enable()
544555
&& consumer_group_info.as_ref().unwrap().get_message_model()
545556
== MessageModel::Broadcasting
546557
{
@@ -671,6 +682,15 @@ where
671682
};
672683

673684
//ColdDataFlow not implement
685+
686+
cfg_if::cfg_if! {
687+
if #[cfg(feature = "local_file_store")] {
688+
if self.cold_data_cg_ctr_service.is_cg_need_cold_data_flow_ctr(request_header.consumer_group.as_str()) {
689+
unimplemented!("ColdDataFlow not implement")
690+
}
691+
}
692+
}
693+
674694
let use_reset_offset_feature = self.broker_config.use_server_side_reset_offset;
675695
let topic = request_header.topic.as_str();
676696
let group = request_header.consumer_group.as_str();
@@ -754,8 +774,8 @@ where
754774
return -1;
755775
}
756776
let consumer_group_info = self.consumer_manager.get_consumer_group_info(group);
757-
let proxy_pull_broadcast = RequestSource::ProxyForBroadcast.get_value()
758-
== request_header.request_source.unwrap_or(-2);
777+
let proxy_pull_broadcast = RequestSource::ProxyForBroadcast
778+
== From::from(request_header.request_source.unwrap_or(-2));
759779

760780
if is_broadcast(proxy_pull_broadcast, consumer_group_info.as_ref()) {
761781
let client_id = if proxy_pull_broadcast {
@@ -821,12 +841,67 @@ pub(crate) fn is_broadcast(
821841
proxy_pull_broadcast: bool,
822842
consumer_group_info: Option<&ConsumerGroupInfo>,
823843
) -> bool {
824-
match consumer_group_info {
825-
Some(info) => {
826-
proxy_pull_broadcast
827-
|| (info.get_message_model() == MessageModel::Broadcasting
828-
&& info.get_consume_type() == ConsumeType::ConsumePassively)
829-
}
830-
None => proxy_pull_broadcast,
844+
proxy_pull_broadcast
845+
|| consumer_group_info.map_or(false, |info| {
846+
matches!(info.get_message_model(), MessageModel::Broadcasting)
847+
&& matches!(info.get_consume_type(), ConsumeType::ConsumePassively)
848+
})
849+
}
850+
851+
#[cfg(test)]
852+
mod tests {
853+
use rocketmq_common::common::consumer::consume_from_where::ConsumeFromWhere;
854+
use rocketmq_remoting::protocol::heartbeat::consume_type::ConsumeType;
855+
use rocketmq_remoting::protocol::heartbeat::message_model::MessageModel;
856+
857+
use super::*;
858+
use crate::client::consumer_group_info::ConsumerGroupInfo;
859+
860+
#[test]
861+
fn returns_true_for_proxy_pull_broadcast() {
862+
let result = is_broadcast(true, None);
863+
assert!(
864+
result,
865+
"Should return true when proxy_pull_broadcast is true"
866+
);
867+
}
868+
869+
#[test]
870+
fn returns_false_for_non_broadcast_and_active_consumption() {
871+
let consumer_group_info = ConsumerGroupInfo::new(
872+
"test_group".to_string(),
873+
ConsumeType::ConsumeActively,
874+
MessageModel::Clustering,
875+
ConsumeFromWhere::ConsumeFromLastOffset,
876+
);
877+
let result = is_broadcast(false, Some(&consumer_group_info));
878+
assert!(
879+
!result,
880+
"Should return false for non-broadcast and active consumption"
881+
);
882+
}
883+
884+
#[test]
885+
fn returns_true_for_broadcast_and_passive_consumption() {
886+
let consumer_group_info = ConsumerGroupInfo::new(
887+
"test_group".to_string(),
888+
ConsumeType::ConsumePassively,
889+
MessageModel::Broadcasting,
890+
ConsumeFromWhere::ConsumeFromLastOffset,
891+
);
892+
let result = is_broadcast(false, Some(&consumer_group_info));
893+
assert!(
894+
result,
895+
"Should return true for broadcast and passive consumption"
896+
);
897+
}
898+
899+
#[test]
900+
fn returns_false_when_no_consumer_group_info_provided() {
901+
let result = is_broadcast(false, None);
902+
assert!(
903+
!result,
904+
"Should return false when no consumer group info is provided"
905+
);
831906
}
832907
}

0 commit comments

Comments
 (0)