@@ -50,6 +50,7 @@ use tracing::warn;
5050
5151use crate :: client:: consumer_group_info:: ConsumerGroupInfo ;
5252use crate :: client:: manager:: consumer_manager:: ConsumerManager ;
53+ use crate :: coldctr:: cold_data_cg_ctr_service:: ColdDataCgCtrService ;
5354use crate :: coldctr:: cold_data_pull_request_hold_service:: NO_SUSPEND_KEY ;
5455use crate :: filter:: expression_for_retry_message_filter:: ExpressionForRetryMessageFilter ;
5556use crate :: filter:: expression_message_filter:: ExpressionMessageFilter ;
@@ -73,6 +74,7 @@ pub struct PullMessageProcessor<MS> {
7374 consumer_offset_manager : Arc < ConsumerOffsetManager > ,
7475 broadcast_offset_manager : Arc < BroadcastOffsetManager > ,
7576 message_store : Arc < MS > ,
77+ cold_data_cg_ctr_service : Arc < ColdDataCgCtrService > ,
7678}
7779
7880impl < MS > PullMessageProcessor < MS > {
@@ -99,6 +101,7 @@ impl<MS> PullMessageProcessor<MS> {
99101 consumer_offset_manager,
100102 broadcast_offset_manager,
101103 message_store,
104+ cold_data_cg_ctr_service : Arc :: new ( Default :: default ( ) ) ,
102105 }
103106 }
104107
@@ -462,9 +465,17 @@ where
462465 ) ;
463466 }
464467 match RequestSource :: parse_integer ( request_header. request_source ) {
465- RequestSource :: ProxyForBroadcast => { }
466- RequestSource :: ProxyForStream => { }
467- _ => { }
468+ RequestSource :: ProxyForBroadcast => {
469+ unimplemented ! ( "ProxyForBroadcast not implement" )
470+ }
471+ RequestSource :: ProxyForStream => {
472+ unimplemented ! ( "ProxyForStream not implement" )
473+ }
474+ _ => self . consumer_manager . compensate_basic_consumer_info (
475+ request_header. consumer_group . as_str ( ) ,
476+ ConsumeType :: ConsumePassively ,
477+ MessageModel :: Clustering ,
478+ ) ,
468479 }
469480 let has_subscription_flag =
470481 PullSysFlag :: has_subscription_flag ( request_header. sys_flag as u32 ) ;
@@ -540,7 +551,7 @@ where
540551 ) ;
541552 }
542553 let sgc_ref = subscription_group_config. as_ref ( ) . unwrap ( ) ;
543- if sgc_ref. consume_broadcast_enable ( )
554+ if ! sgc_ref. consume_broadcast_enable ( )
544555 && consumer_group_info. as_ref ( ) . unwrap ( ) . get_message_model ( )
545556 == MessageModel :: Broadcasting
546557 {
@@ -671,6 +682,15 @@ where
671682 } ;
672683
673684 //ColdDataFlow not implement
685+
686+ cfg_if:: cfg_if! {
687+ if #[ cfg( feature = "local_file_store" ) ] {
688+ if self . cold_data_cg_ctr_service. is_cg_need_cold_data_flow_ctr( request_header. consumer_group. as_str( ) ) {
689+ unimplemented!( "ColdDataFlow not implement" )
690+ }
691+ }
692+ }
693+
674694 let use_reset_offset_feature = self . broker_config . use_server_side_reset_offset ;
675695 let topic = request_header. topic . as_str ( ) ;
676696 let group = request_header. consumer_group . as_str ( ) ;
@@ -754,8 +774,8 @@ where
754774 return -1 ;
755775 }
756776 let consumer_group_info = self . consumer_manager . get_consumer_group_info ( group) ;
757- let proxy_pull_broadcast = RequestSource :: ProxyForBroadcast . get_value ( )
758- == request_header. request_source . unwrap_or ( -2 ) ;
777+ let proxy_pull_broadcast = RequestSource :: ProxyForBroadcast
778+ == From :: from ( request_header. request_source . unwrap_or ( -2 ) ) ;
759779
760780 if is_broadcast ( proxy_pull_broadcast, consumer_group_info. as_ref ( ) ) {
761781 let client_id = if proxy_pull_broadcast {
@@ -821,12 +841,67 @@ pub(crate) fn is_broadcast(
821841 proxy_pull_broadcast : bool ,
822842 consumer_group_info : Option < & ConsumerGroupInfo > ,
823843) -> bool {
824- match consumer_group_info {
825- Some ( info) => {
826- proxy_pull_broadcast
827- || ( info. get_message_model ( ) == MessageModel :: Broadcasting
828- && info. get_consume_type ( ) == ConsumeType :: ConsumePassively )
829- }
830- None => proxy_pull_broadcast,
844+ proxy_pull_broadcast
845+ || consumer_group_info. map_or ( false , |info| {
846+ matches ! ( info. get_message_model( ) , MessageModel :: Broadcasting )
847+ && matches ! ( info. get_consume_type( ) , ConsumeType :: ConsumePassively )
848+ } )
849+ }
850+
851+ #[ cfg( test) ]
852+ mod tests {
853+ use rocketmq_common:: common:: consumer:: consume_from_where:: ConsumeFromWhere ;
854+ use rocketmq_remoting:: protocol:: heartbeat:: consume_type:: ConsumeType ;
855+ use rocketmq_remoting:: protocol:: heartbeat:: message_model:: MessageModel ;
856+
857+ use super :: * ;
858+ use crate :: client:: consumer_group_info:: ConsumerGroupInfo ;
859+
860+ #[ test]
861+ fn returns_true_for_proxy_pull_broadcast ( ) {
862+ let result = is_broadcast ( true , None ) ;
863+ assert ! (
864+ result,
865+ "Should return true when proxy_pull_broadcast is true"
866+ ) ;
867+ }
868+
869+ #[ test]
870+ fn returns_false_for_non_broadcast_and_active_consumption ( ) {
871+ let consumer_group_info = ConsumerGroupInfo :: new (
872+ "test_group" . to_string ( ) ,
873+ ConsumeType :: ConsumeActively ,
874+ MessageModel :: Clustering ,
875+ ConsumeFromWhere :: ConsumeFromLastOffset ,
876+ ) ;
877+ let result = is_broadcast ( false , Some ( & consumer_group_info) ) ;
878+ assert ! (
879+ !result,
880+ "Should return false for non-broadcast and active consumption"
881+ ) ;
882+ }
883+
884+ #[ test]
885+ fn returns_true_for_broadcast_and_passive_consumption ( ) {
886+ let consumer_group_info = ConsumerGroupInfo :: new (
887+ "test_group" . to_string ( ) ,
888+ ConsumeType :: ConsumePassively ,
889+ MessageModel :: Broadcasting ,
890+ ConsumeFromWhere :: ConsumeFromLastOffset ,
891+ ) ;
892+ let result = is_broadcast ( false , Some ( & consumer_group_info) ) ;
893+ assert ! (
894+ result,
895+ "Should return true for broadcast and passive consumption"
896+ ) ;
897+ }
898+
899+ #[ test]
900+ fn returns_false_when_no_consumer_group_info_provided ( ) {
901+ let result = is_broadcast ( false , None ) ;
902+ assert ! (
903+ !result,
904+ "Should return false when no consumer group info is provided"
905+ ) ;
831906 }
832907}
0 commit comments