refactor: handle retain messages with dynamic subscriptions#3710
Conversation
Codecov ReportAttention: Patch coverage is 📢 Thoughts on this report? Let us know! 🚀 New features to boost your workflow:
|
Robot Results
|
| client.unsubscribe_many(diff.unsubscribe).await.unwrap(); | ||
| } | ||
| }); | ||
| if !tf.is_empty() { |
There was a problem hiding this comment.
I'm wondering if this would meet the dynamic subscription requirements of the dynamic mapper, where in most cases when new pipelines are dynamically added, they are most likely going to subscribe to already subscribed topics like te/+/+/+/+/m/temp/meta and would be expecting the retained messages from those topics. But, since dynamic connections are established only when a new subscription is added or an existing one is upgraded, the newly added pipelines won't get the existing retained messages as far as I understand. This won't meet our requirement, right? I'm just wondering if blindly establishing a new connection per peer/client (pipeline) would be better here.
There was a problem hiding this comment.
But, since dynamic connections are established only when a new subscription is added or an existing one is upgraded
No, the exact opposite. We establish a dynamic connection when we receive a subscription that does already exist.
I'm just wondering if blindly establishing a new connection would be better here.
If we did this blindly for all topics, all that would achieve is retain messages sent to newly subscribed topics would be received twice.
There was a problem hiding this comment.
No, the exact opposite. We establish a dynamic connection when we receive a subscription that does already exist.
Not sure what I was doing wrong then. When I tested the same by tweaking the dynamic_subscribers_receive_retain_messages test with the following code at the end:
client_0
.send(PublishOrSubscribe::Subscribe(SubscriptionRequest {
diff: SubscriptionDiff {
subscribe: ["a/b".into()].into(),
unsubscribe: [].into(),
},
client_id: client_id_1,
}))
.await
.unwrap();
assert_eq!(timeout(client_0.recv()).await.unwrap(), msg);
the assertion validating if client0 gets the same retained message again, timed out.
There was a problem hiding this comment.
That is the intention. A new client subscribing shouldn't redeliver the retain messages to an existing subscription?
There was a problem hiding this comment.
That's where I see a problem. For all the pipelines, the generic mapper is the only peer of the mqtt actor, right? While it is connected to the mqtt actor while being built, it would have declared a set of subscriptions based on all the pipelines that existed at that time. And when a new pipeline is dynamically registered, but with some topics that the previous pipelines had already subscribed, it should still get the retained messages on those previously registered topics as well, right? Since the new pipeline is not registering as a new peer/client, but issuing the subscription request via the same gen mapper peer which has already received the retained messages for the same topics, it wouldn't get those retained messages even though they are "new" to the new pipeline, right? That's my concern.
There was a problem hiding this comment.
As that is not so easy to fix, an option is to let the tedge_gen_mapper maintains a connection to the MQTT actor per pipeline.
There was a problem hiding this comment.
With the dynamic mapper more likely to use the newly added dynamic connection requests, I'm just wondering who/when the dynamic subscription requests would be used.
There was a problem hiding this comment.
I think the mapper should be using both (if we change the input topic for a mapping rule, for instance, that will cause a change in the subscription, but shouldn't need a new MQTT actor connection).
Also, the "connection" part has nothing to do with MQTT connections and is merely a dynamic connection between two actors.
5664833 to
6d510fc
Compare
didier-wenzek
left a comment
There was a problem hiding this comment.
Nice improvement. I just have a proposal to simplify actor connections.
albinsuresh
left a comment
There was a problem hiding this comment.
Things look fine from a functionality perspective, except for one remark on the async nature of clearing entity data (which we can discuss). But, it feels like the connection logic with the MQTT actor has gotten a lot more complicated now. Unfortunately, I don't have any concrete suggestions yet on how to improve it.
| client.unsubscribe_many(diff.unsubscribe).await.unwrap(); | ||
| } | ||
| }); | ||
| if !tf.is_empty() { |
There was a problem hiding this comment.
With the dynamic mapper more likely to use the newly added dynamic connection requests, I'm just wondering who/when the dynamic subscription requests would be used.
didier-wenzek
left a comment
There was a problem hiding this comment.
I will be happy to merge this PR, once fixed the remaining small comments.
e653586 to
053328f
Compare
didier-wenzek
left a comment
There was a problem hiding this comment.
Approved, the failing system test (related to PKCS11) being fixed on the main branch.
Thank you for your perseverance in finding the most appropriate approach.
Signed-off-by: James Rhodes <jarhodes314@gmail.com>
…scriptions Signed-off-by: James Rhodes <jarhodes314@gmail.com>
Signed-off-by: James Rhodes <jarhodes314@gmail.com>
Signed-off-by: James Rhodes <jarhodes314@gmail.com>
| Subscribe(SubscriptionRequest), | ||
| /// A one-shot request for all the retain messages for a set of topics | ||
| /// | ||
| /// The provided sender is used to |
There was a problem hiding this comment.
| /// The provided sender is used to | |
| /// The provided sender is used to send those retained messages back to the requesting peer |
053328f to
9e0fa9a
Compare
Signed-off-by: James Rhodes <jarhodes314@gmail.com>
Signed-off-by: James Rhodes <jarhodes314@gmail.com>
Signed-off-by: James Rhodes <jarhodes314@gmail.com>
9e0fa9a to
bdbab78
Compare
Proposed changes
Handle retain messages when dynamic subscribers subscribe to a topic with an existing subscription. With the original implementation, new subscribers to topics that are already subscribed to wouldn't receive retain messages, e.g.
retain message exists on a/b
client a connections with subscription to a/b
client b connects with no subscriptions
mqtt actor connects -> retain message is received and forwarded to client a
client b subscribes to a/b
the message won't be resent to the mqtt actor for forwarding since an existing subscription exists
With these changes, the subscription from client b will cause a connection to be created dynamically to retrieve the relevant retain messages (i.e. the ones not covered by a new subscription).
In the case of an existing subscription to
a/band a new subscription toa/+, both the existing client and the newly subscribed client will receive the retain message. I don't believe this is a huge issue, as spontaneous redelivery of retain messages can occur regardless in the case of connection flakiness, for example.Still TODO:
Types of changes
Paste Link to the issue
Checklist
just prepare-devonce)just formatas mentioned in CODING_GUIDELINESjust checkas mentioned in CODING_GUIDELINESFurther comments