Skip to content

refactor: handle retain messages with dynamic subscriptions#3710

Merged
jarhodes314 merged 7 commits intothin-edge:mainfrom
jarhodes314:refactor/dynamic-subscription-retain
Jul 25, 2025
Merged

refactor: handle retain messages with dynamic subscriptions#3710
jarhodes314 merged 7 commits intothin-edge:mainfrom
jarhodes314:refactor/dynamic-subscription-retain

Conversation

@jarhodes314
Copy link
Copy Markdown
Contributor

@jarhodes314 jarhodes314 commented Jun 26, 2025

Proposed changes

Handle retain messages when dynamic subscribers subscribe to a topic with an existing subscription. With the original implementation, new subscribers to topics that are already subscribed to wouldn't receive retain messages, e.g.

retain message exists on a/b
client a connections with subscription to a/b
client b connects with no subscriptions
mqtt actor connects -> retain message is received and forwarded to client a
client b subscribes to a/b
the message won't be resent to the mqtt actor for forwarding since an existing subscription exists

With these changes, the subscription from client b will cause a connection to be created dynamically to retrieve the relevant retain messages (i.e. the ones not covered by a new subscription).

In the case of an existing subscription to a/b and a new subscription to a/+, both the existing client and the newly subscribed client will receive the retain message. I don't believe this is a huge issue, as spontaneous redelivery of retain messages can occur regardless in the case of connection flakiness, for example.

Still TODO:

  • Integrate these changes with the entity store retain message clearing

Types of changes

  • Bugfix (non-breaking change which fixes an issue)
  • New feature (non-breaking change which adds functionality)
  • Improvement (general improvements like code refactoring that doesn't explicitly fix a bug or add any new functionality)
  • Documentation Update (if none of the other choices apply)
  • Breaking change (fix or feature that would cause existing functionality to not work as expected)

Paste Link to the issue


Checklist

  • I have read the CONTRIBUTING doc
  • I have signed the CLA (in all commits with git commit -s. You can activate automatic signing by running just prepare-dev once)
  • I ran just format as mentioned in CODING_GUIDELINES
  • I used just check as mentioned in CODING_GUIDELINES
  • I have added tests that prove my fix is effective or that my feature works
  • I have added necessary documentation (if appropriate)

Further comments

@jarhodes314 jarhodes314 temporarily deployed to Test Pull Request June 26, 2025 10:11 — with GitHub Actions Inactive
@jarhodes314 jarhodes314 added refactoring Developer value theme:mqtt Theme: mqtt and mosquitto related topics labels Jun 26, 2025
@codecov
Copy link
Copy Markdown

codecov bot commented Jun 26, 2025

Codecov Report

Attention: Patch coverage is 91.13372% with 61 lines in your changes missing coverage. Please review.

Files with missing lines Patch % Lines
crates/extensions/tedge_mqtt_ext/src/lib.rs 88.30% 34 Missing and 6 partials ⚠️
crates/extensions/tedge_mqtt_ext/src/tests.rs 95.76% 0 Missing and 8 partials ⚠️
...rates/core/tedge_agent/src/entity_manager/tests.rs 89.06% 7 Missing ⚠️
crates/common/mqtt_channel/src/tests.rs 89.36% 0 Missing and 5 partials ⚠️
crates/common/mqtt_channel/src/connection.rs 83.33% 0 Missing and 1 partial ⚠️

📢 Thoughts on this report? Let us know!

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.

@github-actions
Copy link
Copy Markdown
Contributor

github-actions bot commented Jun 26, 2025

Robot Results

✅ Passed ❌ Failed ⏭️ Skipped Total Pass % ⏱️ Duration
666 0 3 666 100 1h49m21.293545s

client.unsubscribe_many(diff.unsubscribe).await.unwrap();
}
});
if !tf.is_empty() {
Copy link
Copy Markdown
Contributor

@albinsuresh albinsuresh Jun 27, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm wondering if this would meet the dynamic subscription requirements of the dynamic mapper, where in most cases when new pipelines are dynamically added, they are most likely going to subscribe to already subscribed topics like te/+/+/+/+/m/temp/meta and would be expecting the retained messages from those topics. But, since dynamic connections are established only when a new subscription is added or an existing one is upgraded, the newly added pipelines won't get the existing retained messages as far as I understand. This won't meet our requirement, right? I'm just wondering if blindly establishing a new connection per peer/client (pipeline) would be better here.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But, since dynamic connections are established only when a new subscription is added or an existing one is upgraded

No, the exact opposite. We establish a dynamic connection when we receive a subscription that does already exist.

I'm just wondering if blindly establishing a new connection would be better here.

If we did this blindly for all topics, all that would achieve is retain messages sent to newly subscribed topics would be received twice.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, the exact opposite. We establish a dynamic connection when we receive a subscription that does already exist.

Not sure what I was doing wrong then. When I tested the same by tweaking the dynamic_subscribers_receive_retain_messages test with the following code at the end:

client_0
        .send(PublishOrSubscribe::Subscribe(SubscriptionRequest {
            diff: SubscriptionDiff {
                subscribe: ["a/b".into()].into(),
                unsubscribe: [].into(),
            },
            client_id: client_id_1,
        }))
        .await
        .unwrap();
assert_eq!(timeout(client_0.recv()).await.unwrap(), msg);

the assertion validating if client0 gets the same retained message again, timed out.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That is the intention. A new client subscribing shouldn't redeliver the retain messages to an existing subscription?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's where I see a problem. For all the pipelines, the generic mapper is the only peer of the mqtt actor, right? While it is connected to the mqtt actor while being built, it would have declared a set of subscriptions based on all the pipelines that existed at that time. And when a new pipeline is dynamically registered, but with some topics that the previous pipelines had already subscribed, it should still get the retained messages on those previously registered topics as well, right? Since the new pipeline is not registering as a new peer/client, but issuing the subscription request via the same gen mapper peer which has already received the retained messages for the same topics, it wouldn't get those retained messages even though they are "new" to the new pipeline, right? That's my concern.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As that is not so easy to fix, an option is to let the tedge_gen_mapper maintains a connection to the MQTT actor per pipeline.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

With the dynamic mapper more likely to use the newly added dynamic connection requests, I'm just wondering who/when the dynamic subscription requests would be used.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the mapper should be using both (if we change the input topic for a mapping rule, for instance, that will cause a change in the subscription, but shouldn't need a new MQTT actor connection).

Also, the "connection" part has nothing to do with MQTT connections and is merely a dynamic connection between two actors.

@jarhodes314 jarhodes314 force-pushed the refactor/dynamic-subscription-retain branch from 5664833 to 6d510fc Compare July 14, 2025 15:37
@reubenmiller reubenmiller marked this pull request as draft July 15, 2025 09:06
@jarhodes314 jarhodes314 temporarily deployed to Test Pull Request July 16, 2025 16:10 — with GitHub Actions Inactive
@jarhodes314 jarhodes314 marked this pull request as ready for review July 17, 2025 12:58
@didier-wenzek didier-wenzek temporarily deployed to Test Pull Request July 18, 2025 11:26 — with GitHub Actions Inactive
Copy link
Copy Markdown
Contributor

@didier-wenzek didier-wenzek left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice improvement. I just have a proposal to simplify actor connections.

Copy link
Copy Markdown
Contributor

@albinsuresh albinsuresh left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Things look fine from a functionality perspective, except for one remark on the async nature of clearing entity data (which we can discuss). But, it feels like the connection logic with the MQTT actor has gotten a lot more complicated now. Unfortunately, I don't have any concrete suggestions yet on how to improve it.

client.unsubscribe_many(diff.unsubscribe).await.unwrap();
}
});
if !tf.is_empty() {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

With the dynamic mapper more likely to use the newly added dynamic connection requests, I'm just wondering who/when the dynamic subscription requests would be used.

Copy link
Copy Markdown
Contributor

@didier-wenzek didier-wenzek left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will be happy to merge this PR, once fixed the remaining small comments.

Copy link
Copy Markdown
Contributor

@didier-wenzek didier-wenzek left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Approved, the failing system test (related to PKCS11) being fixed on the main branch.

Thank you for your perseverance in finding the most appropriate approach.

Signed-off-by: James Rhodes <jarhodes314@gmail.com>
…scriptions

Signed-off-by: James Rhodes <jarhodes314@gmail.com>
Signed-off-by: James Rhodes <jarhodes314@gmail.com>
Signed-off-by: James Rhodes <jarhodes314@gmail.com>
Copy link
Copy Markdown
Contributor

@albinsuresh albinsuresh left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM.

Subscribe(SubscriptionRequest),
/// A one-shot request for all the retain messages for a set of topics
///
/// The provided sender is used to
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
/// The provided sender is used to
/// The provided sender is used to send those retained messages back to the requesting peer

@jarhodes314 jarhodes314 force-pushed the refactor/dynamic-subscription-retain branch from 053328f to 9e0fa9a Compare July 25, 2025 10:03
Signed-off-by: James Rhodes <jarhodes314@gmail.com>
Signed-off-by: James Rhodes <jarhodes314@gmail.com>
Signed-off-by: James Rhodes <jarhodes314@gmail.com>
@jarhodes314 jarhodes314 force-pushed the refactor/dynamic-subscription-retain branch from 9e0fa9a to bdbab78 Compare July 25, 2025 10:13
@jarhodes314 jarhodes314 temporarily deployed to Test Pull Request July 25, 2025 10:13 — with GitHub Actions Inactive
@jarhodes314 jarhodes314 added this pull request to the merge queue Jul 25, 2025
Merged via the queue into thin-edge:main with commit 4590e26 Jul 25, 2025
51 of 52 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

refactoring Developer value theme:mqtt Theme: mqtt and mosquitto related topics

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants