Skip to content

fix: builtin bridge disconnected when under heavy load#3122

Merged
didier-wenzek merged 12 commits intothin-edge:mainfrom
didier-wenzek:fix/builtin-bridge-disconnected
Sep 13, 2024
Merged

fix: builtin bridge disconnected when under heavy load#3122
didier-wenzek merged 12 commits intothin-edge:mainfrom
didier-wenzek:fix/builtin-bridge-disconnected

Conversation

@didier-wenzek
Copy link
Copy Markdown
Contributor

Proposed changes

When a large amount of messages has to be published from the device to the cloud (say after a network outage),
the builtin bridge fails to cope with the load and reaches a point where no more progress is made. After a while, the local MQTT broker closes the builtin bridge connection, with an error:

1725524939: Client tedge-mapper-bridge-c8y has exceeded timeout, disconnecting.

The root cause is a dead lock inside the builtin bridge. The two main tasks (one publishing local messages to the remote MQTT endpoint, the other forwarding the remote message acknowledgements to the local broker) are blocked waiting for the other to make progress.

What makes the issue not so easy to fix is that the MQTT library used by thin-edge (rumqttc) provides no way to reduce the pressure. When a message is received from the local MQTT connection and delivered to the builtin bridge, then the bridge can only acknowledge the message (and this should be done only when properly published and acknowledged by the cloud endpoint) or drop it (by pretending it has been processed). The bridge has no way to tell rumqttc that a message cannot be processed right now. Until the message is properly acknowledged, the broker keeps sending the same message again and again to rumqttc but those retries are not forwarded to the bridge.

So, what is proposed here is hold in memory all the messages until they are properly processed (received from local, published to cloud, acknowledged by cloud, acknowledged to local). This is done using mpsc::unbounded().

Notes:

  • If the mapper is shutdown with pending messages, all those will be resent by the MQTT broker.
  • Not only a unbounded channel has been introduced, but also a new background task is spawned.
    This is to ensure that the rumqtt channel and the half-bridge companion channel are in sync.

Types of changes

  • Bugfix (non-breaking change which fixes an issue)
  • New feature (non-breaking change which adds functionality)
  • Improvement (general improvements like code refactoring that doesn't explicitly fix a bug or add any new functionality)
  • Documentation Update (if none of the other choices apply)
  • Breaking change (fix or feature that would cause existing functionality to not work as expected)

Paste Link to the issue

#3083

Checklist

  • I have read the CONTRIBUTING doc
  • I have signed the CLA (in all commits with git commit -s)
  • I ran cargo fmt as mentioned in CODING_GUIDELINES
  • I used cargo clippy as mentioned in CODING_GUIDELINES
  • I have added tests that prove my fix is effective or that my feature works
  • I have added necessary documentation (if appropriate)

Further comments

Signed-off-by: Didier Wenzek <didier.wenzek@free.fr>
Spawning a task to acknowledge a message was done to avoid blocking
the cloud AsyncClient when the local AsyncClient is too busy or worse
blocked trying to forward messages to the cloud AsyncClient.

However, this simply defers the issue when the local AsyncClient is blocked.
Spawned tasks are pilling up and no messages is actually acknowledged.

Instead of using an implicit unbounded queue of tasks,
one increases the size of the channels used by the local and cloud AsyncClients.
For now, this increase size is just a guess large enough to have the
unit tests pass. => Following steps are required to align these buffers
to the number of in-flight messages.

Signed-off-by: Didier Wenzek <didier.wenzek@free.fr>
@codecov
Copy link
Copy Markdown

codecov bot commented Sep 12, 2024

Codecov Report

Attention: Patch coverage is 85.71429% with 20 lines in your changes missing coverage. Please review.

Files with missing lines Patch % Lines
crates/extensions/tedge_mqtt_bridge/src/lib.rs 85.38% 10 Missing and 9 partials ⚠️
crates/extensions/tedge_mqtt_bridge/src/health.rs 90.00% 1 Missing ⚠️
Additional details and impacted files

📢 Thoughts on this report? Let us know!

@github-actions
Copy link
Copy Markdown
Contributor

github-actions bot commented Sep 12, 2024

Robot Results

✅ Passed ❌ Failed ⏭️ Skipped Total Pass % ⏱️ Duration
505 0 2 505 100 1h24m23.779899s

Copy link
Copy Markdown
Contributor

@jarhodes314 jarhodes314 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Given how problematic this issue seemed at first, this is a surprisingly elegant solution. I've left some comments about improving the bidirectional channel API now it's less general-purpose.

]
}

struct BidirectionalChannelHalf<T> {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Following looking at the way we now send, it feels like BidirectionalChannelHalf is a bit too generic. If you used it in the obvious way, just calling send and recv, it wouldn't do anything. It could do with renaming to reflect this change.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Addressed by:


Event::Outgoing(Outgoing::PingReq) => {
let waiting = forward_pkid_to_received_msg.len();
info!("Bridge {name} connection: received={received} forwarded={forwarded} published={published} waiting={waiting} acknowledged={acknowledged} finalized={finalized} ignored={ignored}");
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this meant to be at INFO level?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is. Except that I wonder if this appropriate to log this progress every minute. Every hours might be useful.

My only concern here is that this is not logged when the builtin bridge is blocked! i.e. when this would be the more insightful :-(

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My only concern here is that this is not logged when the builtin bridge is blocked! i.e. when this would be the more insightful :-(

Ah yes, if we had a "log when you're blocked for a while" capability, that would make debugging the bridge far easier.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will remove this not so usefull info! message.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed: 4cadd6d

}
};
debug!("Received notification ({name}) {notification:?}");
debug!("Bridge {name} connection: received={received} forwarded={forwarded} published={published} waiting={} acknowledged={acknowledged} finalized={finalized} ignored={ignored}",
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a really nice way to observe what's going on without resorting to logging every event and trying to manually reconstruct what's going on.

@didier-wenzek didier-wenzek changed the title Fix/builtin bridge disconnected fix: builtin bridge disconnected when under heavy load Sep 13, 2024
@didier-wenzek didier-wenzek added bug Something isn't working theme:bridge Built-in (Rust) bridge topics labels Sep 13, 2024
if let Some(msg) = forward_pkid_to_received_msg.remove(&ack_pkid) {
let target = target.clone();
tokio::spawn(async move { target.ack(&msg).await.unwrap() });
if let Err(err) = target.ack(&msg).await {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can ack not block in the same way publish does?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I previously observed blocked ack when the channel cycle was only made of bounded channels. Now that one of the channel of this cycle is unbounded, I don't think ack might block. However, to be sure, I'm missing a better understanding of rumttc internals.

That said, now that BidirectionalChannelHalf owns the target, we can do the same for ack as for pub.
=> I will do that.

Copy link
Copy Markdown
Contributor

@jarhodes314 jarhodes314 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks really good, and the new API now makes things largely simpler. The improved debug logging is also really useful. :)

} else {
// Being not forwarded to this bridge target
// The message has to be acknowledged
recv_client.ack(&publish).await.unwrap()
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point. This shouldn't ever be the case, but it is the correct way to handle it. We should be able to convert any incoming message as the subscription is based on the mapping rules.

Signed-off-by: Didier Wenzek <didier.wenzek@free.fr>
Signed-off-by: Didier Wenzek <didier.wenzek@free.fr>
A task is spawn by each half bridge to
- collect all the messages received from the local MQTT connection
  (resp from the remote MQTT connection)
- forward these messages to the remote MQTT connection (resp to the
  local MQTT connection)
- forward also these messages to the companion half bridge

The goal is to make sure the two event loops consuming the messages from
the rumttc::AsyncClient are never blocked. For that reason, this new
background task uses an unbounded input channel.

Signed-off-by: Didier Wenzek <didier.wenzek@free.fr>
The BidirectionalChannelHalf API currently exposes internal details of the
messages sent over the misc channels, making its use combersome and
difficult to understand.

As a first step, the duplication of the message (to be sent over MQTT
and to the companion half bridge) is now done under the hood.

Signed-off-by: Didier Wenzek <didier.wenzek@free.fr>
As suggested by @jarhodes314.

Let BidirectionalChannelHalf::new take the AsyncClient target and spawn
the publisher loop. Doing so, the unbounded receiver can be directly passed to spawn_publisher
with no need to be temporarily store in the BidirectionalChannelHalf struct.

Signed-off-by: Didier Wenzek <didier.wenzek@free.fr>
The generic `send` method has been replaced by explicit `publish` and `ack` methods.

Signed-off-by: Didier Wenzek <didier.wenzek@free.fr>
This struct is now used as an AsyncClient, the fact there is a
bidirectional channel being more an internal details.

Signed-off-by: Didier Wenzek <didier.wenzek@free.fr>
So the BridgeHealthMonitor doesn't have to know the internal
representation of the BridgeMessage used to tee messages over the MQTT
target and the half bridge companion.

Signed-off-by: Didier Wenzek <didier.wenzek@free.fr>
The counts of messages forwarded to the target
and of those properly acknowledged to the source (i.e. finalized),
where optimistic: i.e. counting messages pushed in the processing queue
and not those actually processed. This is now fixed.

Has also been fixed, the received, published and acknowledged counters
excluding bridge unrelated messages (such as health messages).

Signed-off-by: Didier Wenzek <didier.wenzek@free.fr>
Signed-off-by: Didier Wenzek <didier.wenzek@free.fr>
@didier-wenzek didier-wenzek force-pushed the fix/builtin-bridge-disconnected branch from 796feca to 80be771 Compare September 13, 2024 16:13
@didier-wenzek didier-wenzek added this pull request to the merge queue Sep 13, 2024
Merged via the queue into thin-edge:main with commit 1fcd3b3 Sep 13, 2024
@didier-wenzek didier-wenzek deleted the fix/builtin-bridge-disconnected branch September 13, 2024 17:18
@jarhodes314 jarhodes314 mentioned this pull request Jan 31, 2025
11 tasks
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

bug Something isn't working theme:bridge Built-in (Rust) bridge topics

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants