Simplify the operation handling code by replacing fragmented control flow with regular async/await#2881
Simplify the operation handling code by replacing fragmented control flow with regular async/await#2881Bravo555 wants to merge 6 commits intothin-edge:mainfrom
Conversation
| type C8yMapperOutput = MqttMessage; | ||
|
|
||
| #[derive(Clone)] | ||
| struct MqttPublisher(Arc<Mutex<LoggingSender<MqttMessage>>>); |
There was a problem hiding this comment.
Any specific reason you couldn't just freely clone the senders and use them independently? I see that you're already planning on exploring that option, but just wondering why you chose this path first. Faced any issues with cloned senders?
There was a problem hiding this comment.
I did that first because I knew that I'll need Mutexes for synchronization of other parts, and to be honest, I forgot that our senders are Clone, so I thought Mutex is the only option, but it's not the case.
There was a problem hiding this comment.
to be honest, I forgot that our senders are Clone, so I thought Mutex is the only option, but it's not the case
That's not entirely correct. Because our LoggingSender<MqttMessage> internally uses futures::channel::mpsc::Sender, which is send(&mut self), we need the methods to be &mut self as well if we want to use the sender. However, if we put the sender behind a Mutex, we can implement send(&self), allowing methods of the worker to also be &self.
| while let Some(event) = messages.recv().await { | ||
| let worker = Arc::clone(&worker); | ||
|
|
||
| tokio::spawn(async move { |
There was a problem hiding this comment.
I had a different model in mind where we have independent workers (even actors, may be) for each operation handler with minimal coordination with the top level mapper actor/worker, eliminating the need to share and lock the top level converter. This mapper actor would still coordinate the MQTT messages and convert them to typed operation messages and dispatch that to those respective operation workers. Once the worker gets an operation message, they should be able to freely exchange messages between the other actors like upload/download actors. Extending the usage of the RequestEnvelope beyond the HTTP actors would be key to enabling this.
There was a problem hiding this comment.
I had a different model in mind where we have independent workers (even actors, may be) for each operation handler with minimal coordination with the top level mapper actor/worker, eliminating the need to share and lock the top level converter.
Yes, the idea would be to decouple as much as possible from CumulocityConverter wrt. the operation handling, so it can be run separately as a new task, or maybe as a new actor, without locking the rest of the converter. Right now, basically all the functionality is implemented in CumulocityConverter in &mut self methods, so it's impossible to run stuff concurrently without locking.
Ideally the each operation handler should be as independent as possible, and we should be able to easily reason about each in isolation, without the control flow jumping between actor.rs, converter.rs, and e.g. log_upload.rs.
Right now they're just tokio tasks, because my current goal is to just move the boundary on which we block, but the handlers could definitely be extracted out of the main actor.
This mapper actor would still coordinate the MQTT messages and convert them to typed operation messages and dispatch that to those respective operation workers. Once the worker gets an operation message, they should be able to freely exchange messages between the other actors like upload/download actors. Extending the usage of the RequestEnvelope beyond the HTTP actors would be key to enabling this.
That's right, the actor which owns the MQTT message box should route these operation state change message to the operation handlers, which should then be able to do do blocking stuff, like wait for the upload, without synchronizing with the main actor. But I'd say that we overuse messages a bit. It's fine to have actors that manage shared mutable state, but for downloading/uploading, what shared state is there that needs to be managed between different downloads/uploads? Why shouldn't we use the downloader/uploader directly instead of going via the actor?
And I believe that sometimes we use the actor only so that we can mock it in tests. That instead of having an interface that we can mock, we effectively mock the interface by simulating messages being sent in the message boxes. And I think this way of testing is a bit tedious, because you need to handle and ignore unrelated messages, and sometimes when messages are not being send by actor under test, you need to do timeouts, and so on. I think there's room for improvement there.
But that's a discussion for another time, and I expect we'll discuss it more in the future. For the purposes of this PR, the important part is allowing handlers to await either tasks they themselves spawn, or requests sent to other actors via RequestEnvelope, one of these two.
There was a problem hiding this comment.
Why shouldn't we use the downloader/uploader directly instead of going via the actor?
One additional reason, other than message visibility and testability, is to control the bandwidth usage as well. We don't want to let too many uploads/downloads happening parallelly, to reduce the load on the network and the disk. Routing those via a centralised downloader/uploader will let us better control the maximum parallelism that can be allowed.
| url: tedge_file_url.clone(), | ||
| file_dir: destination_dir, | ||
| let downloader = | ||
| Downloader::new(destination_path.clone(), self.config.identity.clone()); |
There was a problem hiding this comment.
I believe we can continue using the downloader actor here by using a ClientMessageBox to it so that we can await on the response receiver inline. The c8y_firmware_manager is already using the same.
There was a problem hiding this comment.
That's right, we could use the ClientMessageBox here, but I also wanted to explore if we could just use the downloader directly, without relying on an outside actor to do it. I'd say it has an advantage of decreasing coupling. The disadvantage is that as the messages are no longer visible, but does other code need them to be visible, other than testing code? If so, then maybe we don't need to go via the actor.
But the main way I did it like this, was to just demonstrate that after the refactor it will be possible, whereas currently this would block. But now we can easily swap this with an EnvelopeSender if we want to go via the actor.
There was a problem hiding this comment.
And additional benefit of going via the downloader actor is described in this comment: #2881 (comment)
| let http_proxy = C8YHttpProxy::new(http); | ||
| let timer_sender = timer.connect_client(box_builder.get_sender().sender_clone()); | ||
| let upload_sender = uploader.connect_client(box_builder.get_sender().sender_clone()); | ||
| let uploader = ClientMessageBox::new(uploader); |
There was a problem hiding this comment.
We should be able to do the same for the downloader as well.
There was a problem hiding this comment.
We can, explanation why it was not done for now in comment: #2881 (comment)
892ecbe to
f5f7511
Compare
Codecov ReportAttention: Patch coverage is
Additional details and impacted files
|
To be more sure about future changes to the c8y mapper and the converter, we need tests checking that operations happen concurrently. This change introduces a test that only covers config_snapshot operation, but it expectedly fails when regressions were introduced by the previous commits in this PR. Signed-off-by: Marcel Guzik <marcel.guzik@inetum.com>
f5f7511 to
884b25f
Compare
Robot Results
Failed Tests
|
|
Closing in favour of #2904, which is a more promising and feasible approach. |
Proposed changes
This PR should make the operation handle simpler to reason about and change, by allowing operation handlers to use
.awaitwithout blocking. The scope of the PR is to show the idea working on theconfig_snapshotandconfig_updateoperations.Below i describe the current state, which is subject to change.
Mutexes to guard theMqttPublisherandCumulocityConverter:struct MqttPublisher(Arc<Mutex<LoggingSender<MqttMessage>>>)withfn send(&self)is ok, as we lock the mutex to use the sender, and unlock it immediately after we finished sending, so it shouldn't block. The alternative would be to clone the sender for each worker, and I will explore it later.Mutex<CumulocityConverter>, it ensures that only a single worker can useCumulocityConverterat any given time, which can lead to deadlocks if we're not careful, and currently results in the same behaviour as before, i.e. blocking in the operation handling code will block processing of other messages, because the lock will be open across await points.This is temporary, and will have to be fixed, but is used right now so I could show how the full operation handling function would look like, without the current fragmentation. Also, the test demonstrating that the converter doesn't block should be made.
Identitytype from thedownloadcrate, so I can use it in the next commit.Downloaderis used directly in the operation handler, to show how we can use it directly without going through the actor: if we only want to download a file via HTTP, and this download shouldn't influence anything else, we can use it directly.UploaderActorviaClientMessageBox, where we receive the response directly in the same place where wesend(), instead of going to the top-level receiver in the top-level actor, which then needs to decide to call an operation handling function again. This way we still use an uploader actor, and have messages sent between them, but don't need to fragment our control flow. This was done to show that we can still send and receive messages from other actors from within the proposed new operation handlers.Next steps
replace lock on mqtt sender with cloned senders(can be done, but will require&mut self)CumulocityConverterso operation handling doesn't blockTypes of changes
Paste Link to the issue
Checklist
cargo fmtas mentioned in CODING_GUIDELINEScargo clippyas mentioned in CODING_GUIDELINESFurther comments