1616 */
1717use rocketmq_common:: common:: message:: message_enum:: MessageRequestMode ;
1818use rocketmq_common:: ArcRefCellWrapper ;
19+ use rocketmq_rust:: Shutdown ;
1920use tracing:: info;
2021use tracing:: warn;
2122
@@ -27,19 +28,25 @@ use crate::factory::mq_client_instance::MQClientInstance;
2728#[ derive( Clone ) ]
2829pub struct PullMessageService {
2930 tx : Option < tokio:: sync:: mpsc:: Sender < Box < dyn MessageRequest + Send + ' static > > > ,
31+ tx_shutdown : Option < tokio:: sync:: broadcast:: Sender < ( ) > > ,
3032}
3133
3234impl PullMessageService {
3335 pub fn new ( ) -> Self {
34- PullMessageService { tx : None }
36+ PullMessageService {
37+ tx : None ,
38+ tx_shutdown : None ,
39+ }
3540 }
3641 pub async fn start ( & mut self , mut instance : ArcRefCellWrapper < MQClientInstance > ) {
3742 let ( tx, mut rx) =
3843 tokio:: sync:: mpsc:: channel :: < Box < dyn MessageRequest + Send + ' static > > ( 1024 * 4 ) ;
44+ let ( mut shutdown, tx_shutdown) = Shutdown :: new ( 1 ) ;
3945 self . tx = Some ( tx) ;
46+ self . tx_shutdown = Some ( tx_shutdown) ;
4047 tokio:: spawn ( async move {
4148 info ! ( ">>>>>>>>>>>>>>>>>>>>>>>PullMessageService started<<<<<<<<<<<<<<<<<<<<<<<<<<<<" ) ;
42- while let Some ( request) = rx. recv ( ) . await {
49+ /* while let Some(request) = rx.recv().await {
4350 if request.get_message_request_mode() == MessageRequestMode::Pull {
4451 let pull_request =
4552 unsafe { *Box::from_raw(Box::into_raw(request) as *mut PullRequest) };
@@ -49,6 +56,32 @@ impl PullMessageService {
4956 unsafe { *Box::from_raw(Box::into_raw(request) as *mut PopRequest) };
5057 PullMessageService::pop_message(pop_request, instance.as_mut()).await;
5158 }
59+ }*/
60+ if shutdown. is_shutdown ( ) {
61+ info ! ( "PullMessageService shutdown" ) ;
62+ return ;
63+ }
64+ loop {
65+ tokio:: select! {
66+ _ = shutdown. recv( ) => {
67+ info!( "PullMessageService shutdown" ) ;
68+ }
69+ Some ( request) = rx. recv( ) => {
70+ if request. get_message_request_mode( ) == MessageRequestMode :: Pull {
71+ let pull_request =
72+ unsafe { * Box :: from_raw( Box :: into_raw( request) as * mut PullRequest ) } ;
73+ PullMessageService :: pull_message( pull_request, instance. as_mut( ) ) . await ;
74+ } else {
75+ let pop_request =
76+ unsafe { * Box :: from_raw( Box :: into_raw( request) as * mut PopRequest ) } ;
77+ PullMessageService :: pop_message( pop_request, instance. as_mut( ) ) . await ;
78+ }
79+ }
80+ }
81+ if shutdown. is_shutdown ( ) {
82+ info ! ( "PullMessageService shutdown" ) ;
83+ break ;
84+ }
5285 }
5386 } ) ;
5487 }
@@ -106,4 +139,10 @@ impl PullMessageService {
106139 warn ! ( "Failed to send pull request to pull_tx, error: {:?}" , e) ;
107140 }
108141 }
142+
143+ pub fn shutdown ( & self ) {
144+ if let Err ( e) = self . tx_shutdown . as_ref ( ) . unwrap ( ) . send ( ( ) ) {
145+ warn ! ( "Failed to send shutdown signal to pull_tx, error: {:?}" , e) ;
146+ }
147+ }
109148}
0 commit comments