refactor: support dynamic MQTT subscriptions in mqtt-channel/tedge-mqtt-ext#3661
Conversation
Codecov ReportAttention: Patch coverage is 📢 Thoughts on this report? Let us know! 🚀 New features to boost your workflow:
|
Robot Results
|
| /// | ||
| /// Default: An empty topic list | ||
| pub subscriptions: TopicFilter, | ||
| pub subscriptions: Arc<Mutex<TopicFilter>>, |
There was a problem hiding this comment.
As the subscriptions are never updated through the config but using the connection, I would consider to remove the use of interior mutability here, and to rather use these subscriptions as an initial value on connect.
There was a problem hiding this comment.
I don't quite understand. The reason this is mutable is because we need to maintain the list of subscribed topics so we can resubscribe if the connection drops at any point.
|
|
||
| *** Variables *** | ||
| ${CERT_TEMPLATE} /etc/tedge/hsm/cert.template # created by init_softhsm script | ||
| ${CERT_TEMPLATE} /etc/tedge/hsm/cert.template # created by init_softhsm script |
Bravo555
left a comment
There was a problem hiding this comment.
As it is a standalone PR, I'd also like to see some tests, unless for some reason it's especially unfeasible to add them right now.
albinsuresh
left a comment
There was a problem hiding this comment.
Adding some integration tests in tests.rs to showcase the usage of the dynamic subscribe/unsubscribe feature would be good, as a system test probably isn't feasible in this state.
didier-wenzek
left a comment
There was a problem hiding this comment.
The struct MqtTrie introduced here is the right abstraction to address dynamic MQTT subscriptions, although a bit involved. Nicely implemented with a good code coverage.
| } | ||
| } | ||
|
|
||
| // TODO this should only accept valid message topic, not containing wildcards |
There was a problem hiding this comment.
In practice, this is check has already be done (because the message has been received from MQTT or wrapped into a Topic).
So I would simply enforced that at the top level: MqtTrie::matches<'a>(&'a self, topic: &Topic) -> Vec<&'a T> and adds a debug_assert! here on the head.
| (Some(_), None, _) => break None, | ||
| (None, None, Some(Winner::This)) => break Some(Ordering::Greater), | ||
| (None, None, Some(Winner::Other)) => break Some(Ordering::Less), | ||
| (None, None, None) => break Some(Ordering::Equal), |
There was a problem hiding this comment.
It's not easy to grasp the logic behind this long list of cases.
The tests help a lot, though.
But is my understand correct? Is the intent to define topic_filter_1 <= topic_filter_2 as "any topic accepted by topic_filter_1 is also accepted by topic_filter_2"?
I would add a doc comment to clarify this.
There was a problem hiding this comment.
Still need to add a doc comment, although I have now grouped and commented the cases to broadly explain why we need them. Your understanding is correct though
Signed-off-by: James Rhodes <jarhodes314@gmail.com>
This is an attempt to fix the existing issues with dynamic subscriptions, as well as hopefully improving the performance when matching messages against a large number of subscriptions. Still TODO: - Integrate this with the MQTT actor, so we can realise these benefits - Document the datastructure and its methods Signed-off-by: James Rhodes <jarhodes314@gmail.com>
Signed-off-by: James Rhodes <jarhodes314@gmail.com>
Signed-off-by: James Rhodes <jarhodes314@gmail.com>
Signed-off-by: James Rhodes <jarhodes314@gmail.com>
…work, currently no way to actually enact the dynamic subscription changes from the client Signed-off-by: James Rhodes <jarhodes314@gmail.com>
Signed-off-by: James Rhodes <jarhodes314@gmail.com>
Signed-off-by: James Rhodes <jarhodes314@gmail.com>
Signed-off-by: James Rhodes <jarhodes314@gmail.com>
Signed-off-by: James Rhodes <jarhodes314@gmail.com>
…ribers Signed-off-by: James Rhodes <jarhodes314@gmail.com>
Signed-off-by: James Rhodes <jarhodes314@gmail.com>
Signed-off-by: James Rhodes <jarhodes314@gmail.com>
Signed-off-by: James Rhodes <jarhodes314@gmail.com>
Signed-off-by: Didier Wenzek <didier.wenzek@free.fr>
1c0ca94 to
2292145
Compare
| #[test] | ||
| fn issue_1() { | ||
| let mut t = MqtTrie::default(); | ||
| t.insert("/#", 0); |
There was a problem hiding this comment.
Never realised that the root topic also could be empty, until seeing this test. What's weird is that both Topic::new("") and Topic::new("/") are valid topics, but neither Topic::new("/+") nor Topic::new("/#") are valid. And yet the trie accepts them. Hope that doesn't break any of the assumptions.
But that makes me wonder how the MQTT actor should react to invalid topics in the SubscriptionRequest, since SubscriptionDiff also stores the subscriptions as Hashset<String> and not Hashset<Topic>.
There was a problem hiding this comment.
They aren't valid topics since they contain wildcards, but they are valid topic filters to subscribe to. So you can't publish a message to /# but you can subscribe to it and receive a message on e.g. /a or the empty topic name ``
There was a problem hiding this comment.
but neither Topic::new("/+") nor Topic::new("/#") are valid
Aah yes, my bad. Wrongly used Topic instead of TopicFilter.
But that makes me wonder how the MQTT actor should react to invalid topics in the
SubscriptionRequest
I see that the trie is still silently accepting bad topic filters like a/#/a but internally keeping it as a/#. We probably need to use stronger types like TopicFilter at least in the SubscriptionRequest to prevent that.
There was a problem hiding this comment.
I see that the trie is silently accepting bad topic filters like
a/#/abut internally keeping it asa/#. We probably need to use stronger types likeTopicFilterto prevent that.
Yeah, but only at the entry point of the API i.e the 3 methods of MqtTrie.
See also for a similar comment: #3661 (comment)
This fixes the handling of subscriptions when we clone mqtt_channel::Config. With the previous behaviour, any `Config` clones shared all their subscriptions.
cf6005f to
1b249b3
Compare
Proposed changes
For the custom mapper, the user specifies which topics to subscribe the mapping pipelines to. In order to support dynamically reloading these pipelines, we need to be able to update the subscriptions. This was originally done in #3626, but this PR extracts out the changes for separate review.
To properly support dynamic subscriptions, the list of subscribers and their topic filters has been replaced with a trie. The general idea is that this structure maps topics to ids of subscribed clients, allowing us to efficiently retrieve the subscribed clients without checking every possible subscription. It manages which topics we subscribe to, ensuring we subscribe to a minimal set of topics to cover the currently subscribed clients, and (crucially) allows us to unsubscribe when topics are no longer consumed. Since this datastructure is separate, the goal is to have 100% unit test coverage here before merging (at the time of writing it is close to this, but not quite there).
How the trie is structured is detailed in
thin-edge.io/crates/extensions/tedge_mqtt_ext/src/trie.rs
Lines 7 to 77 in d8f3302
One thing this PR doesn't do currently is handle retain messages. The effect of this is that if client is subscribed to a topic with a retain message and then another client subscribes, this new client does not receive the retain message since no MQTT subscription is created in this case.
Types of changes
Paste Link to the issue
Checklist
just prepare-devonce)just formatas mentioned in CODING_GUIDELINESjust checkas mentioned in CODING_GUIDELINESFurther comments