fix: builtin bridge disconnected when under heavy load#3122
fix: builtin bridge disconnected when under heavy load#3122didier-wenzek merged 12 commits intothin-edge:mainfrom
Conversation
Signed-off-by: Didier Wenzek <didier.wenzek@free.fr>
Spawning a task to acknowledge a message was done to avoid blocking the cloud AsyncClient when the local AsyncClient is too busy or worse blocked trying to forward messages to the cloud AsyncClient. However, this simply defers the issue when the local AsyncClient is blocked. Spawned tasks are pilling up and no messages is actually acknowledged. Instead of using an implicit unbounded queue of tasks, one increases the size of the channels used by the local and cloud AsyncClients. For now, this increase size is just a guess large enough to have the unit tests pass. => Following steps are required to align these buffers to the number of in-flight messages. Signed-off-by: Didier Wenzek <didier.wenzek@free.fr>
Codecov ReportAttention: Patch coverage is
Additional details and impacted files📢 Thoughts on this report? Let us know! |
Robot Results
|
jarhodes314
left a comment
There was a problem hiding this comment.
Given how problematic this issue seemed at first, this is a surprisingly elegant solution. I've left some comments about improving the bidirectional channel API now it's less general-purpose.
| ] | ||
| } | ||
|
|
||
| struct BidirectionalChannelHalf<T> { |
There was a problem hiding this comment.
Following looking at the way we now send, it feels like BidirectionalChannelHalf is a bit too generic. If you used it in the obvious way, just calling send and recv, it wouldn't do anything. It could do with renaming to reflect this change.
|
|
||
| Event::Outgoing(Outgoing::PingReq) => { | ||
| let waiting = forward_pkid_to_received_msg.len(); | ||
| info!("Bridge {name} connection: received={received} forwarded={forwarded} published={published} waiting={waiting} acknowledged={acknowledged} finalized={finalized} ignored={ignored}"); |
There was a problem hiding this comment.
Is this meant to be at INFO level?
There was a problem hiding this comment.
It is. Except that I wonder if this appropriate to log this progress every minute. Every hours might be useful.
My only concern here is that this is not logged when the builtin bridge is blocked! i.e. when this would be the more insightful :-(
There was a problem hiding this comment.
My only concern here is that this is not logged when the builtin bridge is blocked! i.e. when this would be the more insightful :-(
Ah yes, if we had a "log when you're blocked for a while" capability, that would make debugging the bridge far easier.
There was a problem hiding this comment.
I will remove this not so usefull info! message.
| } | ||
| }; | ||
| debug!("Received notification ({name}) {notification:?}"); | ||
| debug!("Bridge {name} connection: received={received} forwarded={forwarded} published={published} waiting={} acknowledged={acknowledged} finalized={finalized} ignored={ignored}", |
There was a problem hiding this comment.
This is a really nice way to observe what's going on without resorting to logging every event and trying to manually reconstruct what's going on.
| if let Some(msg) = forward_pkid_to_received_msg.remove(&ack_pkid) { | ||
| let target = target.clone(); | ||
| tokio::spawn(async move { target.ack(&msg).await.unwrap() }); | ||
| if let Err(err) = target.ack(&msg).await { |
There was a problem hiding this comment.
Can ack not block in the same way publish does?
There was a problem hiding this comment.
I previously observed blocked ack when the channel cycle was only made of bounded channels. Now that one of the channel of this cycle is unbounded, I don't think ack might block. However, to be sure, I'm missing a better understanding of rumttc internals.
That said, now that BidirectionalChannelHalf owns the target, we can do the same for ack as for pub.
=> I will do that.
jarhodes314
left a comment
There was a problem hiding this comment.
This looks really good, and the new API now makes things largely simpler. The improved debug logging is also really useful. :)
| } else { | ||
| // Being not forwarded to this bridge target | ||
| // The message has to be acknowledged | ||
| recv_client.ack(&publish).await.unwrap() |
There was a problem hiding this comment.
Good point. This shouldn't ever be the case, but it is the correct way to handle it. We should be able to convert any incoming message as the subscription is based on the mapping rules.
Signed-off-by: Didier Wenzek <didier.wenzek@free.fr>
Signed-off-by: Didier Wenzek <didier.wenzek@free.fr>
A task is spawn by each half bridge to - collect all the messages received from the local MQTT connection (resp from the remote MQTT connection) - forward these messages to the remote MQTT connection (resp to the local MQTT connection) - forward also these messages to the companion half bridge The goal is to make sure the two event loops consuming the messages from the rumttc::AsyncClient are never blocked. For that reason, this new background task uses an unbounded input channel. Signed-off-by: Didier Wenzek <didier.wenzek@free.fr>
The BidirectionalChannelHalf API currently exposes internal details of the messages sent over the misc channels, making its use combersome and difficult to understand. As a first step, the duplication of the message (to be sent over MQTT and to the companion half bridge) is now done under the hood. Signed-off-by: Didier Wenzek <didier.wenzek@free.fr>
As suggested by @jarhodes314. Let BidirectionalChannelHalf::new take the AsyncClient target and spawn the publisher loop. Doing so, the unbounded receiver can be directly passed to spawn_publisher with no need to be temporarily store in the BidirectionalChannelHalf struct. Signed-off-by: Didier Wenzek <didier.wenzek@free.fr>
The generic `send` method has been replaced by explicit `publish` and `ack` methods. Signed-off-by: Didier Wenzek <didier.wenzek@free.fr>
This struct is now used as an AsyncClient, the fact there is a bidirectional channel being more an internal details. Signed-off-by: Didier Wenzek <didier.wenzek@free.fr>
So the BridgeHealthMonitor doesn't have to know the internal representation of the BridgeMessage used to tee messages over the MQTT target and the half bridge companion. Signed-off-by: Didier Wenzek <didier.wenzek@free.fr>
The counts of messages forwarded to the target and of those properly acknowledged to the source (i.e. finalized), where optimistic: i.e. counting messages pushed in the processing queue and not those actually processed. This is now fixed. Has also been fixed, the received, published and acknowledged counters excluding bridge unrelated messages (such as health messages). Signed-off-by: Didier Wenzek <didier.wenzek@free.fr>
Signed-off-by: Didier Wenzek <didier.wenzek@free.fr>
796feca to
80be771
Compare
Proposed changes
When a large amount of messages has to be published from the device to the cloud (say after a network outage),
the builtin bridge fails to cope with the load and reaches a point where no more progress is made. After a while, the local MQTT broker closes the builtin bridge connection, with an error:
The root cause is a dead lock inside the builtin bridge. The two main tasks (one publishing local messages to the remote MQTT endpoint, the other forwarding the remote message acknowledgements to the local broker) are blocked waiting for the other to make progress.
What makes the issue not so easy to fix is that the MQTT library used by thin-edge (
rumqttc) provides no way to reduce the pressure. When a message is received from the local MQTT connection and delivered to the builtin bridge, then the bridge can only acknowledge the message (and this should be done only when properly published and acknowledged by the cloud endpoint) or drop it (by pretending it has been processed). The bridge has no way to tellrumqttcthat a message cannot be processed right now. Until the message is properly acknowledged, the broker keeps sending the same message again and again torumqttcbut those retries are not forwarded to the bridge.So, what is proposed here is hold in memory all the messages until they are properly processed (received from local, published to cloud, acknowledged by cloud, acknowledged to local). This is done using
mpsc::unbounded().Notes:
This is to ensure that the rumqtt channel and the half-bridge companion channel are in sync.
Types of changes
Paste Link to the issue
#3083
Checklist
cargo fmtas mentioned in CODING_GUIDELINEScargo clippyas mentioned in CODING_GUIDELINESFurther comments