@@ -12,11 +12,14 @@ use c8y_auth_proxy::url::ProxyUrlGenerator;
1212use c8y_http_proxy:: handle:: C8YHttpProxy ;
1313use c8y_http_proxy:: messages:: C8YRestRequest ;
1414use c8y_http_proxy:: messages:: C8YRestResult ;
15+ use camino:: Utf8Path ;
1516use std:: path:: PathBuf ;
17+ use std:: sync:: Arc ;
1618use std:: time:: Duration ;
1719use tedge_actors:: fan_in_message_type;
1820use tedge_actors:: Actor ;
1921use tedge_actors:: Builder ;
22+ use tedge_actors:: ChannelError ;
2023use tedge_actors:: CloneSender ;
2124use tedge_actors:: DynSender ;
2225use tedge_actors:: LoggingSender ;
@@ -43,6 +46,7 @@ use tedge_uploader_ext::UploadRequest;
4346use tedge_uploader_ext:: UploadResult ;
4447use tedge_utils:: file:: create_directory_with_defaults;
4548use tedge_utils:: file:: FileError ;
49+ use tokio:: sync:: Mutex ;
4650use tracing:: error;
4751use tracing:: warn;
4852
@@ -60,15 +64,36 @@ pub(crate) type IdDownloadRequest = (CmdId, DownloadRequest);
6064fan_in_message_type ! ( C8yMapperInput [ MqttMessage , FsWatchEvent , SyncComplete , IdUploadResult , IdDownloadResult ] : Debug ) ;
6165type C8yMapperOutput = MqttMessage ;
6266
67+ #[ derive( Clone ) ]
68+ struct MqttPublisher ( Arc < Mutex < LoggingSender < MqttMessage > > > ) ;
69+
70+ impl MqttPublisher {
71+ pub fn new ( mqtt_publisher : LoggingSender < MqttMessage > ) -> Self {
72+ Self ( Arc :: new ( Mutex :: new ( mqtt_publisher) ) )
73+ }
74+
75+ pub async fn send ( & self , message : MqttMessage ) -> Result < ( ) , ChannelError > {
76+ self . 0 . lock ( ) . await . send ( message) . await
77+ }
78+ }
79+
6380pub struct C8yMapperActor {
6481 converter : CumulocityConverter ,
6582 messages : SimpleMessageBox < C8yMapperInput , C8yMapperOutput > ,
66- mqtt_publisher : LoggingSender < MqttMessage > ,
83+ mqtt_publisher : MqttPublisher ,
6784 timer_sender : LoggingSender < SyncStart > ,
6885 bridge_status_messages : SimpleMessageBox < MqttMessage , MqttMessage > ,
6986 c8y_bridge_service_name : String ,
7087}
7188
89+ pub struct C8yMapperWorker {
90+ // converter methods that handle download and upload state changes are mutable, so need to
91+ // synchronize them with a mutex
92+ converter : Mutex < CumulocityConverter > ,
93+ mqtt_publisher : MqttPublisher ,
94+ ops_dir : Arc < Utf8Path > ,
95+ }
96+
7297#[ async_trait]
7398impl Actor for C8yMapperActor {
7499 fn name ( & self ) -> & str {
@@ -95,25 +120,46 @@ impl Actor for C8yMapperActor {
95120 . send ( SyncStart :: new ( SYNC_WINDOW , ( ) ) )
96121 . await ?;
97122
98- while let Some ( event) = self . messages . recv ( ) . await {
99- match event {
100- C8yMapperInput :: MqttMessage ( message) => {
101- self . process_mqtt_message ( message) . await ?;
102- }
103- C8yMapperInput :: FsWatchEvent ( event) => {
104- self . process_file_watch_event ( event) . await ?;
105- }
106- C8yMapperInput :: SyncComplete ( _) => {
107- self . process_sync_timeout ( ) . await ?;
108- }
109- C8yMapperInput :: IdUploadResult ( ( cmd_id, result) ) => {
110- self . process_upload_result ( cmd_id, result) . await ?;
111- }
112- C8yMapperInput :: IdDownloadResult ( ( cmd_id, result) ) => {
113- self . process_download_result ( cmd_id, result) . await ?;
123+ let mut messages = self . messages ;
124+
125+ let ops_dir = Arc :: clone ( & self . converter . config . ops_dir ) ;
126+ let worker = Arc :: new ( C8yMapperWorker {
127+ converter : Mutex :: new ( self . converter ) ,
128+ mqtt_publisher : self . mqtt_publisher ,
129+ ops_dir,
130+ } ) ;
131+
132+ while let Some ( event) = messages. recv ( ) . await {
133+ let worker = Arc :: clone ( & worker) ;
134+
135+ tokio:: spawn ( async move {
136+ match event {
137+ C8yMapperInput :: MqttMessage ( message) => {
138+ // request message
139+ worker. process_mqtt_message ( message) . await ?;
140+ }
141+ C8yMapperInput :: FsWatchEvent ( event) => {
142+ // request message
143+ worker. process_file_watch_event ( event) . await ?;
144+ }
145+ C8yMapperInput :: SyncComplete ( _) => {
146+ // request message
147+ worker. process_sync_timeout ( ) . await ?;
148+ }
149+ C8yMapperInput :: IdUploadResult ( ( cmd_id, result) ) => {
150+ // immediate message
151+ worker. process_upload_result ( cmd_id, result) . await ?;
152+ }
153+ C8yMapperInput :: IdDownloadResult ( ( cmd_id, result) ) => {
154+ // immediate message
155+ worker. process_download_result ( cmd_id, result) . await ?;
156+ }
114157 }
115- }
158+
159+ Ok :: < ( ) , RuntimeError > ( ( ) )
160+ } ) ;
116161 }
162+
117163 Ok ( ( ) )
118164 }
119165}
@@ -130,15 +176,17 @@ impl C8yMapperActor {
130176 Self {
131177 converter,
132178 messages,
133- mqtt_publisher,
179+ mqtt_publisher : MqttPublisher :: new ( mqtt_publisher ) ,
134180 timer_sender,
135181 bridge_status_messages,
136182 c8y_bridge_service_name,
137183 }
138184 }
185+ }
139186
140- async fn process_mqtt_message ( & mut self , message : MqttMessage ) -> Result < ( ) , RuntimeError > {
141- let converted_messages = self . converter . convert ( & message) . await ;
187+ impl C8yMapperWorker {
188+ async fn process_mqtt_message ( & self , message : MqttMessage ) -> Result < ( ) , RuntimeError > {
189+ let converted_messages = self . converter . lock ( ) . await . convert ( & message) . await ;
142190
143191 for converted_message in converted_messages. into_iter ( ) {
144192 self . mqtt_publisher . send ( converted_message) . await ?;
@@ -147,26 +195,21 @@ impl C8yMapperActor {
147195 Ok ( ( ) )
148196 }
149197
150- async fn process_file_watch_event (
151- & mut self ,
152- file_event : FsWatchEvent ,
153- ) -> Result < ( ) , RuntimeError > {
198+ async fn process_file_watch_event ( & self , file_event : FsWatchEvent ) -> Result < ( ) , RuntimeError > {
154199 match file_event. clone ( ) {
155200 FsWatchEvent :: FileCreated ( path)
156201 | FsWatchEvent :: FileDeleted ( path)
157202 | FsWatchEvent :: Modified ( path) => {
158203 // Process inotify events only for the main device at the root operations directory
159204 // directly under /etc/tedge/operations/c8y
160- if path. parent ( ) == Some ( self . converter . config . ops_dir . as_std_path ( ) ) {
161- match process_inotify_events (
162- self . converter . config . ops_dir . as_std_path ( ) ,
163- & path,
164- file_event,
165- ) {
205+ if path. parent ( ) == Some ( self . ops_dir . as_std_path ( ) ) {
206+ match process_inotify_events ( self . ops_dir . as_std_path ( ) , & path, file_event) {
166207 Ok ( Some ( discovered_ops) ) => {
167208 self . mqtt_publisher
168209 . send (
169210 self . converter
211+ . lock ( )
212+ . await
170213 . process_operation_update_message ( discovered_ops) ,
171214 )
172215 . await ?;
@@ -184,9 +227,9 @@ impl C8yMapperActor {
184227 Ok ( ( ) )
185228 }
186229
187- pub async fn process_sync_timeout ( & mut self ) -> Result < ( ) , RuntimeError > {
230+ pub async fn process_sync_timeout ( & self ) -> Result < ( ) , RuntimeError > {
188231 // Once the sync phase is complete, retrieve all sync messages from the converter and process them
189- let sync_messages = self . converter . sync_messages ( ) ;
232+ let sync_messages = self . converter . lock ( ) . await . sync_messages ( ) ;
190233 for message in sync_messages {
191234 self . process_mqtt_message ( message) . await ?;
192235 }
@@ -195,11 +238,17 @@ impl C8yMapperActor {
195238 }
196239
197240 async fn process_upload_result (
198- & mut self ,
241+ & self ,
199242 cmd_id : CmdId ,
200243 upload_result : UploadResult ,
201244 ) -> Result < ( ) , RuntimeError > {
202- match self . converter . pending_upload_operations . remove ( & cmd_id) {
245+ match self
246+ . converter
247+ . lock ( )
248+ . await
249+ . pending_upload_operations
250+ . remove ( & cmd_id)
251+ {
203252 None => error ! ( "Received an upload result for the unknown command ID: {cmd_id}" ) ,
204253 Some ( queued_data) => {
205254 let payload = match queued_data. operation {
@@ -242,25 +291,32 @@ impl C8yMapperActor {
242291 }
243292
244293 async fn process_download_result (
245- & mut self ,
294+ & self ,
246295 cmd_id : CmdId ,
247296 result : DownloadResult ,
248297 ) -> Result < ( ) , RuntimeError > {
249298 // download not from c8y_proxy, check if it was from FTS
250- let operation_result = if let Some ( fts_download ) = self
299+ let fts_download_operation = self
251300 . converter
301+ . lock ( )
302+ . await
252303 . pending_fts_download_operations
253- . remove ( & cmd_id)
254- {
304+ . remove ( & cmd_id) ;
305+
306+ let operation_result = if let Some ( fts_download) = fts_download_operation {
255307 let cmd_id = cmd_id. clone ( ) ;
256308 match fts_download. download_type {
257309 FtsDownloadOperationType :: ConfigDownload => {
258310 self . converter
311+ . lock ( )
312+ . await
259313 . handle_fts_config_download_result ( cmd_id, result, fts_download)
260314 . await
261315 }
262316 FtsDownloadOperationType :: LogDownload => {
263317 self . converter
318+ . lock ( )
319+ . await
264320 . handle_fts_log_download_result ( cmd_id, result, fts_download)
265321 . await
266322 }
0 commit comments