Skip to content

refactor: support dynamic MQTT subscriptions in mqtt-channel/tedge-mqtt-ext#3661

Merged
jarhodes314 merged 18 commits intothin-edge:mainfrom
jarhodes314:refactor/dynamic-mqtt-subs
Jun 20, 2025
Merged

refactor: support dynamic MQTT subscriptions in mqtt-channel/tedge-mqtt-ext#3661
jarhodes314 merged 18 commits intothin-edge:mainfrom
jarhodes314:refactor/dynamic-mqtt-subs

Conversation

@jarhodes314
Copy link
Copy Markdown
Contributor

@jarhodes314 jarhodes314 commented Jun 2, 2025

Proposed changes

For the custom mapper, the user specifies which topics to subscribe the mapping pipelines to. In order to support dynamically reloading these pipelines, we need to be able to update the subscriptions. This was originally done in #3626, but this PR extracts out the changes for separate review.

To properly support dynamic subscriptions, the list of subscribers and their topic filters has been replaced with a trie. The general idea is that this structure maps topics to ids of subscribed clients, allowing us to efficiently retrieve the subscribed clients without checking every possible subscription. It manages which topics we subscribe to, ensuring we subscribe to a minimal set of topics to cover the currently subscribed clients, and (crucially) allows us to unsubscribe when topics are no longer consumed. Since this datastructure is separate, the goal is to have 100% unit test coverage here before merging (at the time of writing it is close to this, but not quite there).

How the trie is structured is detailed in

/// A Trie for matching incoming MQTT messages with their subscribers
///
/// # Structure
/// Each node of the trie contains:
/// - a list of subscribers to the current topic
/// - a map of segments to trie nodes
///
/// As an example, if we subscribed a subscriber `"tedge-mapper"` to topic
/// `c8y/s/us`, the structure would look like:
///
/// ```text
/// {
/// "c8y": {
/// subscribers: [],
/// sub_nodes: {
/// "s": {
/// subscribers: [],
/// sub_nodes: {
/// "us": {
/// subscribers: ["tedge-mapper"],
/// sub_nodes: {}
/// }
/// }
/// }
/// }
/// }
/// }
/// ```
///
/// In practice, this is achieved by having a root node that always has no
/// subscribers. What is shown above is the `sub_nodes` field of the root node.
///
/// # Subscription management
/// One of the requirements for the tedge MQTT actor is to manage subscriptions
/// from a bunch of clients and process them as a single MQTT channel client.
/// This means the actor maintains a minimal set of MQTT topic subscriptions to
/// cover all possible messages to its clients.
///
/// For example, if `client-a` subscribes to `a/b/c` and `client-b` subscribes
/// to `a/#`, the actor should subscribe to `a/#` only, since this wildcard
/// topic captures all messages on `a/b/c`, so we don't require a separate
/// subscription.
///
/// To allow the actor to subscribe/unsubscribe when appropriate,
/// [MqtTrie::insert] and [MqtTrie::remove] both return [SubscriptionDiff]
/// objects. This returns the subscribe/unsubscribe requests that need to be
/// made to the MQTT broker following the internal subscription change.
///
/// Here are some examples of diffs that are returned
///
/// ```
/// # use tedge_mqtt_ext::trie::*;
///
/// let mut t = MqtTrie::default();
/// // First subscriber -> subscribe to that topic
/// assert_eq!(t.insert("a/b", 1), SubscriptionDiff { subscribe: ["a/b".into()].into(), unsubscribe: [].into() });
/// // Another subscriber to the same topics -> don't need to change subscriptions
/// assert_eq!(t.insert("a/b", 2), SubscriptionDiff { subscribe: [].into(), unsubscribe: [].into() });
/// // Subscriber to a different topic -> subscribe to that topic
/// assert_eq!(t.insert("a", 1), SubscriptionDiff { subscribe: ["a".into()].into(), unsubscribe: [].into() });
/// // Subscriber to a segment wildcard -> subscribe to that topic, unsubscribe from static topic
/// assert_eq!(t.insert("a/+", 1), SubscriptionDiff { subscribe: ["a/+".into()].into(), unsubscribe: ["a/b".into()].into() });
/// // Subscriber to a wildcard -> subscribe to that topic and unsubscribe from the matching ones
/// // Don't unsubscribe from the already unsubscribed a/b topic though
/// assert_eq!(t.insert("#", 1), SubscriptionDiff { subscribe: ["#".into()].into(), unsubscribe: ["a".into(), "a/+".into()].into() });
/// ```
///
/// It is still possible to end up with overlapping subscriptions via this
/// method. For instance, `a/+/c` and `a/b/+` both subscribe to messages on
/// `a/b/c`, but aren't overlapping. Currently, [MqtTrie] handles this by
/// subscribing to both `a/+/c` and `a/b/+`.

One thing this PR doesn't do currently is handle retain messages. The effect of this is that if client is subscribed to a topic with a retain message and then another client subscribes, this new client does not receive the retain message since no MQTT subscription is created in this case.

Types of changes

  • Bugfix (non-breaking change which fixes an issue)
  • New feature (non-breaking change which adds functionality)
  • Improvement (general improvements like code refactoring that doesn't explicitly fix a bug or add any new functionality)
  • Documentation Update (if none of the other choices apply)
  • Breaking change (fix or feature that would cause existing functionality to not work as expected)

Paste Link to the issue


Checklist

  • I have read the CONTRIBUTING doc
  • I have signed the CLA (in all commits with git commit -s. You can activate automatic signing by running just prepare-dev once)
  • I ran just format as mentioned in CODING_GUIDELINES
  • I used just check as mentioned in CODING_GUIDELINES
  • I have added tests that prove my fix is effective or that my feature works
  • I have added necessary documentation (if appropriate)

Further comments

@codecov
Copy link
Copy Markdown

codecov bot commented Jun 2, 2025

Codecov Report

Attention: Patch coverage is 97.71739% with 42 lines in your changes missing coverage. Please review.

Files with missing lines Patch % Lines
crates/extensions/tedge_mqtt_ext/src/lib.rs 91.42% 23 Missing and 10 partials ⚠️
crates/common/mqtt_channel/src/connection.rs 90.38% 1 Missing and 4 partials ⚠️
crates/common/mqtt_channel/src/tests.rs 98.88% 1 Missing ⚠️
crates/common/mqtt_channel/src/topics.rs 83.33% 1 Missing ⚠️
crates/extensions/tedge_mqtt_ext/src/tests.rs 98.71% 0 Missing and 1 partial ⚠️
crates/extensions/tedge_mqtt_ext/src/trie.rs 99.91% 1 Missing ⚠️

📢 Thoughts on this report? Let us know!

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.

@jarhodes314 jarhodes314 requested a review from a team as a code owner June 2, 2025 14:24
@jarhodes314 jarhodes314 temporarily deployed to Test Pull Request June 2, 2025 14:24 — with GitHub Actions Inactive
@github-actions
Copy link
Copy Markdown
Contributor

github-actions bot commented Jun 2, 2025

Robot Results

✅ Passed ❌ Failed ⏭️ Skipped Total Pass % ⏱️ Duration
651 0 3 651 100 1h57m42.192897s

@Bravo555 Bravo555 self-assigned this Jun 2, 2025
///
/// Default: An empty topic list
pub subscriptions: TopicFilter,
pub subscriptions: Arc<Mutex<TopicFilter>>,
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As the subscriptions are never updated through the config but using the connection, I would consider to remove the use of interior mutability here, and to rather use these subscriptions as an initial value on connect.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't quite understand. The reason this is mutable is because we need to maintain the list of subscribed topics so we can resubscribe if the connection drops at any point.


*** Variables ***
${CERT_TEMPLATE} /etc/tedge/hsm/cert.template # created by init_softhsm script
${CERT_TEMPLATE} /etc/tedge/hsm/cert.template # created by init_softhsm script
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added commit resolving this to my PR here
2f09de6

Copy link
Copy Markdown
Member

@Bravo555 Bravo555 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As it is a standalone PR, I'd also like to see some tests, unless for some reason it's especially unfeasible to add them right now.

@Bravo555 Bravo555 removed their assignment Jun 3, 2025
Copy link
Copy Markdown
Contributor

@albinsuresh albinsuresh left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Adding some integration tests in tests.rs to showcase the usage of the dynamic subscribe/unsubscribe feature would be good, as a system test probably isn't feasible in this state.

@reubenmiller reubenmiller added theme:mqtt Theme: mqtt and mosquitto related topics refactoring Developer value labels Jun 5, 2025
@jarhodes314 jarhodes314 marked this pull request as draft June 5, 2025 13:53
Copy link
Copy Markdown
Contributor

@didier-wenzek didier-wenzek left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The struct MqtTrie introduced here is the right abstraction to address dynamic MQTT subscriptions, although a bit involved. Nicely implemented with a good code coverage.

}
}

// TODO this should only accept valid message topic, not containing wildcards
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In practice, this is check has already be done (because the message has been received from MQTT or wrapped into a Topic).

So I would simply enforced that at the top level: MqtTrie::matches<'a>(&'a self, topic: &Topic) -> Vec<&'a T> and adds a debug_assert! here on the head.

(Some(_), None, _) => break None,
(None, None, Some(Winner::This)) => break Some(Ordering::Greater),
(None, None, Some(Winner::Other)) => break Some(Ordering::Less),
(None, None, None) => break Some(Ordering::Equal),
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's not easy to grasp the logic behind this long list of cases.

The tests help a lot, though.

But is my understand correct? Is the intent to define topic_filter_1 <= topic_filter_2 as "any topic accepted by topic_filter_1 is also accepted by topic_filter_2"?

I would add a doc comment to clarify this.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added some missing tests 236a2ac

Copy link
Copy Markdown
Contributor Author

@jarhodes314 jarhodes314 Jun 6, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Still need to add a doc comment, although I have now grouped and commented the cases to broadly explain why we need them. Your understanding is correct though

jarhodes314 and others added 15 commits June 19, 2025 09:26
Signed-off-by: James Rhodes <jarhodes314@gmail.com>
This is an attempt to fix the existing issues with dynamic
subscriptions, as well as hopefully improving the performance when
matching messages against a large number of subscriptions.

Still TODO:
- Integrate this with the MQTT actor, so we can realise these benefits
- Document the datastructure and its methods

Signed-off-by: James Rhodes <jarhodes314@gmail.com>
Signed-off-by: James Rhodes <jarhodes314@gmail.com>
Signed-off-by: James Rhodes <jarhodes314@gmail.com>
Signed-off-by: James Rhodes <jarhodes314@gmail.com>
…work, currently no way to actually enact the dynamic subscription changes from the client

Signed-off-by: James Rhodes <jarhodes314@gmail.com>
Signed-off-by: James Rhodes <jarhodes314@gmail.com>
Signed-off-by: James Rhodes <jarhodes314@gmail.com>
Signed-off-by: James Rhodes <jarhodes314@gmail.com>
Signed-off-by: James Rhodes <jarhodes314@gmail.com>
…ribers

Signed-off-by: James Rhodes <jarhodes314@gmail.com>
Signed-off-by: James Rhodes <jarhodes314@gmail.com>
Signed-off-by: James Rhodes <jarhodes314@gmail.com>
Signed-off-by: James Rhodes <jarhodes314@gmail.com>
Signed-off-by: Didier Wenzek <didier.wenzek@free.fr>
#[test]
fn issue_1() {
let mut t = MqtTrie::default();
t.insert("/#", 0);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Never realised that the root topic also could be empty, until seeing this test. What's weird is that both Topic::new("") and Topic::new("/") are valid topics, but neither Topic::new("/+") nor Topic::new("/#") are valid. And yet the trie accepts them. Hope that doesn't break any of the assumptions.

But that makes me wonder how the MQTT actor should react to invalid topics in the SubscriptionRequest, since SubscriptionDiff also stores the subscriptions as Hashset<String> and not Hashset<Topic>.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

They aren't valid topics since they contain wildcards, but they are valid topic filters to subscribe to. So you can't publish a message to /# but you can subscribe to it and receive a message on e.g. /a or the empty topic name ``

Copy link
Copy Markdown
Contributor

@albinsuresh albinsuresh Jun 20, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

but neither Topic::new("/+") nor Topic::new("/#") are valid

Aah yes, my bad. Wrongly used Topic instead of TopicFilter.

But that makes me wonder how the MQTT actor should react to invalid topics in the SubscriptionRequest

I see that the trie is still silently accepting bad topic filters like a/#/a but internally keeping it as a/#. We probably need to use stronger types like TopicFilter at least in the SubscriptionRequest to prevent that.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see that the trie is silently accepting bad topic filters like a/#/a but internally keeping it as a/#. We probably need to use stronger types like TopicFilter to prevent that.

Yeah, but only at the entry point of the API i.e the 3 methods of MqtTrie.

See also for a similar comment: #3661 (comment)

This fixes the handling of subscriptions when we clone mqtt_channel::Config. With the previous behaviour, any `Config` clones shared all their subscriptions.
@jarhodes314 jarhodes314 force-pushed the refactor/dynamic-mqtt-subs branch from cf6005f to 1b249b3 Compare June 20, 2025 12:04
@jarhodes314 jarhodes314 temporarily deployed to Test Pull Request June 20, 2025 12:04 — with GitHub Actions Inactive
@jarhodes314 jarhodes314 added this pull request to the merge queue Jun 20, 2025
Merged via the queue into thin-edge:main with commit bcb8c1b Jun 20, 2025
34 checks passed
@reubenmiller reubenmiller mentioned this pull request Jul 3, 2025
11 tasks
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

refactoring Developer value theme:mqtt Theme: mqtt and mosquitto related topics

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants