Skip to content

Commit 6b90d51

Browse files
committed
c8y mapper actor: spawn task for handling message
1 parent c4718f5 commit 6b90d51

1 file changed

Lines changed: 95 additions & 39 deletions

File tree

  • crates/extensions/c8y_mapper_ext/src

crates/extensions/c8y_mapper_ext/src/actor.rs

Lines changed: 95 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -12,11 +12,14 @@ use c8y_auth_proxy::url::ProxyUrlGenerator;
1212
use c8y_http_proxy::handle::C8YHttpProxy;
1313
use c8y_http_proxy::messages::C8YRestRequest;
1414
use c8y_http_proxy::messages::C8YRestResult;
15+
use camino::Utf8Path;
1516
use std::path::PathBuf;
17+
use std::sync::Arc;
1618
use std::time::Duration;
1719
use tedge_actors::fan_in_message_type;
1820
use tedge_actors::Actor;
1921
use tedge_actors::Builder;
22+
use tedge_actors::ChannelError;
2023
use tedge_actors::CloneSender;
2124
use tedge_actors::DynSender;
2225
use tedge_actors::LoggingSender;
@@ -43,6 +46,7 @@ use tedge_uploader_ext::UploadRequest;
4346
use tedge_uploader_ext::UploadResult;
4447
use tedge_utils::file::create_directory_with_defaults;
4548
use tedge_utils::file::FileError;
49+
use tokio::sync::Mutex;
4650
use tracing::error;
4751
use tracing::warn;
4852

@@ -60,15 +64,36 @@ pub(crate) type IdDownloadRequest = (CmdId, DownloadRequest);
6064
fan_in_message_type!(C8yMapperInput[MqttMessage, FsWatchEvent, SyncComplete, IdUploadResult, IdDownloadResult] : Debug);
6165
type C8yMapperOutput = MqttMessage;
6266

67+
#[derive(Clone)]
68+
struct MqttPublisher(Arc<Mutex<LoggingSender<MqttMessage>>>);
69+
70+
impl MqttPublisher {
71+
pub fn new(mqtt_publisher: LoggingSender<MqttMessage>) -> Self {
72+
Self(Arc::new(Mutex::new(mqtt_publisher)))
73+
}
74+
75+
pub async fn send(&self, message: MqttMessage) -> Result<(), ChannelError> {
76+
self.0.lock().await.send(message).await
77+
}
78+
}
79+
6380
pub struct C8yMapperActor {
6481
converter: CumulocityConverter,
6582
messages: SimpleMessageBox<C8yMapperInput, C8yMapperOutput>,
66-
mqtt_publisher: LoggingSender<MqttMessage>,
83+
mqtt_publisher: MqttPublisher,
6784
timer_sender: LoggingSender<SyncStart>,
6885
bridge_status_messages: SimpleMessageBox<MqttMessage, MqttMessage>,
6986
c8y_bridge_service_name: String,
7087
}
7188

89+
pub struct C8yMapperWorker {
90+
// converter methods that handle download and upload state changes are mutable, so need to
91+
// synchronize them with a mutex
92+
converter: Mutex<CumulocityConverter>,
93+
mqtt_publisher: MqttPublisher,
94+
ops_dir: Arc<Utf8Path>,
95+
}
96+
7297
#[async_trait]
7398
impl Actor for C8yMapperActor {
7499
fn name(&self) -> &str {
@@ -95,25 +120,46 @@ impl Actor for C8yMapperActor {
95120
.send(SyncStart::new(SYNC_WINDOW, ()))
96121
.await?;
97122

98-
while let Some(event) = self.messages.recv().await {
99-
match event {
100-
C8yMapperInput::MqttMessage(message) => {
101-
self.process_mqtt_message(message).await?;
102-
}
103-
C8yMapperInput::FsWatchEvent(event) => {
104-
self.process_file_watch_event(event).await?;
105-
}
106-
C8yMapperInput::SyncComplete(_) => {
107-
self.process_sync_timeout().await?;
108-
}
109-
C8yMapperInput::IdUploadResult((cmd_id, result)) => {
110-
self.process_upload_result(cmd_id, result).await?;
111-
}
112-
C8yMapperInput::IdDownloadResult((cmd_id, result)) => {
113-
self.process_download_result(cmd_id, result).await?;
123+
let mut messages = self.messages;
124+
125+
let ops_dir = Arc::clone(&self.converter.config.ops_dir);
126+
let worker = Arc::new(C8yMapperWorker {
127+
converter: Mutex::new(self.converter),
128+
mqtt_publisher: self.mqtt_publisher,
129+
ops_dir,
130+
});
131+
132+
while let Some(event) = messages.recv().await {
133+
let worker = Arc::clone(&worker);
134+
135+
tokio::spawn(async move {
136+
match event {
137+
C8yMapperInput::MqttMessage(message) => {
138+
// request message
139+
worker.process_mqtt_message(message).await?;
140+
}
141+
C8yMapperInput::FsWatchEvent(event) => {
142+
// request message
143+
worker.process_file_watch_event(event).await?;
144+
}
145+
C8yMapperInput::SyncComplete(_) => {
146+
// request message
147+
worker.process_sync_timeout().await?;
148+
}
149+
C8yMapperInput::IdUploadResult((cmd_id, result)) => {
150+
// immediate message
151+
worker.process_upload_result(cmd_id, result).await?;
152+
}
153+
C8yMapperInput::IdDownloadResult((cmd_id, result)) => {
154+
// immediate message
155+
worker.process_download_result(cmd_id, result).await?;
156+
}
114157
}
115-
}
158+
159+
Ok::<(), RuntimeError>(())
160+
});
116161
}
162+
117163
Ok(())
118164
}
119165
}
@@ -130,15 +176,17 @@ impl C8yMapperActor {
130176
Self {
131177
converter,
132178
messages,
133-
mqtt_publisher,
179+
mqtt_publisher: MqttPublisher::new(mqtt_publisher),
134180
timer_sender,
135181
bridge_status_messages,
136182
c8y_bridge_service_name,
137183
}
138184
}
185+
}
139186

140-
async fn process_mqtt_message(&mut self, message: MqttMessage) -> Result<(), RuntimeError> {
141-
let converted_messages = self.converter.convert(&message).await;
187+
impl C8yMapperWorker {
188+
async fn process_mqtt_message(&self, message: MqttMessage) -> Result<(), RuntimeError> {
189+
let converted_messages = self.converter.lock().await.convert(&message).await;
142190

143191
for converted_message in converted_messages.into_iter() {
144192
self.mqtt_publisher.send(converted_message).await?;
@@ -147,26 +195,21 @@ impl C8yMapperActor {
147195
Ok(())
148196
}
149197

150-
async fn process_file_watch_event(
151-
&mut self,
152-
file_event: FsWatchEvent,
153-
) -> Result<(), RuntimeError> {
198+
async fn process_file_watch_event(&self, file_event: FsWatchEvent) -> Result<(), RuntimeError> {
154199
match file_event.clone() {
155200
FsWatchEvent::FileCreated(path)
156201
| FsWatchEvent::FileDeleted(path)
157202
| FsWatchEvent::Modified(path) => {
158203
// Process inotify events only for the main device at the root operations directory
159204
// directly under /etc/tedge/operations/c8y
160-
if path.parent() == Some(self.converter.config.ops_dir.as_std_path()) {
161-
match process_inotify_events(
162-
self.converter.config.ops_dir.as_std_path(),
163-
&path,
164-
file_event,
165-
) {
205+
if path.parent() == Some(self.ops_dir.as_std_path()) {
206+
match process_inotify_events(self.ops_dir.as_std_path(), &path, file_event) {
166207
Ok(Some(discovered_ops)) => {
167208
self.mqtt_publisher
168209
.send(
169210
self.converter
211+
.lock()
212+
.await
170213
.process_operation_update_message(discovered_ops),
171214
)
172215
.await?;
@@ -184,9 +227,9 @@ impl C8yMapperActor {
184227
Ok(())
185228
}
186229

187-
pub async fn process_sync_timeout(&mut self) -> Result<(), RuntimeError> {
230+
pub async fn process_sync_timeout(&self) -> Result<(), RuntimeError> {
188231
// Once the sync phase is complete, retrieve all sync messages from the converter and process them
189-
let sync_messages = self.converter.sync_messages();
232+
let sync_messages = self.converter.lock().await.sync_messages();
190233
for message in sync_messages {
191234
self.process_mqtt_message(message).await?;
192235
}
@@ -195,11 +238,17 @@ impl C8yMapperActor {
195238
}
196239

197240
async fn process_upload_result(
198-
&mut self,
241+
&self,
199242
cmd_id: CmdId,
200243
upload_result: UploadResult,
201244
) -> Result<(), RuntimeError> {
202-
match self.converter.pending_upload_operations.remove(&cmd_id) {
245+
match self
246+
.converter
247+
.lock()
248+
.await
249+
.pending_upload_operations
250+
.remove(&cmd_id)
251+
{
203252
None => error!("Received an upload result for the unknown command ID: {cmd_id}"),
204253
Some(queued_data) => {
205254
let payload = match queued_data.operation {
@@ -242,25 +291,32 @@ impl C8yMapperActor {
242291
}
243292

244293
async fn process_download_result(
245-
&mut self,
294+
&self,
246295
cmd_id: CmdId,
247296
result: DownloadResult,
248297
) -> Result<(), RuntimeError> {
249298
// download not from c8y_proxy, check if it was from FTS
250-
let operation_result = if let Some(fts_download) = self
299+
let fts_download_operation = self
251300
.converter
301+
.lock()
302+
.await
252303
.pending_fts_download_operations
253-
.remove(&cmd_id)
254-
{
304+
.remove(&cmd_id);
305+
306+
let operation_result = if let Some(fts_download) = fts_download_operation {
255307
let cmd_id = cmd_id.clone();
256308
match fts_download.download_type {
257309
FtsDownloadOperationType::ConfigDownload => {
258310
self.converter
311+
.lock()
312+
.await
259313
.handle_fts_config_download_result(cmd_id, result, fts_download)
260314
.await
261315
}
262316
FtsDownloadOperationType::LogDownload => {
263317
self.converter
318+
.lock()
319+
.await
264320
.handle_fts_log_download_result(cmd_id, result, fts_download)
265321
.await
266322
}

0 commit comments

Comments
 (0)