Skip to content

poc: Generic Mapper using JavaScript#3626

Closed
didier-wenzek wants to merge 18 commits intothin-edge:mainfrom
didier-wenzek:poc/tedge-js-mapper
Closed

poc: Generic Mapper using JavaScript#3626
didier-wenzek wants to merge 18 commits intothin-edge:mainfrom
didier-wenzek:poc/tedge-js-mapper

Conversation

@didier-wenzek
Copy link
Copy Markdown
Contributor

@didier-wenzek didier-wenzek commented May 15, 2025

Proposed changes

This POC explores an alternative to #3517

A thin-edge mapper that can be extended by users with mapping rules implemented in JavaScript.

Features:

  • Chain stateless filters along pipelines: MQTT sub| filter-1 | filter-2 | ... | filter-n | MQTT pub
  • Configure filters
  • Update filter config using MQTT
  • Implement state-full filters (deferring messages up to the end of a time window)
  • Combine user-provided JS filters with built-in Rust filters

Criteria:

  • Expressive power?
  • Dev tooling ?
  • Performance? Memory footprint?
  • Debuggability?

Examples:

  • Add missing timestamps
  • Discard measurements with timestamp out of the expected range (too old or in the future)
  • Translate thin-edge json measurment into cumulocity json
  • Add measurement units - read from topic metadata
  • Translate Collectd data into thin-edge JSON
  • Group messages received during the same time window
  • Compute 5-minute rolling average

Types of changes

  • Bugfix (non-breaking change which fixes an issue)
  • New feature (non-breaking change which adds functionality)
  • Improvement (general improvements like code refactoring that doesn't explicitly fix a bug or add any new functionality)
  • Documentation Update (if none of the other choices apply)
  • Breaking change (fix or feature that would cause existing functionality to not work as expected)

Paste Link to the issue


Checklist

  • I have read the CONTRIBUTING doc
  • I have signed the CLA (in all commits with git commit -s. You can activate automatic signing by running just prepare-dev once)
  • I ran just format as mentioned in CODING_GUIDELINES
  • I used just check as mentioned in CODING_GUIDELINES
  • I have added tests that prove my fix is effective or that my feature works
  • I have added necessary documentation (if appropriate)

Further comments

@codecov
Copy link
Copy Markdown

codecov bot commented May 15, 2025

Codecov Report

Attention: Patch coverage is 80.18868% with 21 lines in your changes missing coverage. Please review.

Files with missing lines Patch % Lines
crates/extensions/tedge_mqtt_ext/src/lib.rs 80.23% 13 Missing and 4 partials ⚠️
crates/common/mqtt_channel/src/connection.rs 50.00% 2 Missing ⚠️
crates/core/plugin_sm/src/operation_logs.rs 0.00% 0 Missing and 1 partial ⚠️
crates/core/tedge_mapper/src/lib.rs 0.00% 1 Missing ⚠️

📢 Thoughts on this report? Let us know!

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.

This commit is a very first step, scaffolding a generic mapper.
The aim is to let users define their own mapping rules
to tranform, filter or enrich data received from various sources.

The idea is to form pipelines of user-provided transformation functions that:

- consume messages from MQTT,
- stream these messages along the transformation stages,
- publish back to MQTT the resulting messages.

Signed-off-by: Didier Wenzek <didier.wenzek@free.fr>
Signed-off-by: Didier Wenzek <didier.wenzek@free.fr>
Signed-off-by: Didier Wenzek <didier.wenzek@free.fr>
Signed-off-by: Didier Wenzek <didier.wenzek@free.fr>
Signed-off-by: Didier Wenzek <didier.wenzek@free.fr>
Signed-off-by: Didier Wenzek <didier.wenzek@free.fr>
Signed-off-by: Didier Wenzek <didier.wenzek@free.fr>
Signed-off-by: Didier Wenzek <didier.wenzek@free.fr>
Signed-off-by: Didier Wenzek <didier.wenzek@free.fr>
Signed-off-by: Didier Wenzek <didier.wenzek@free.fr>
Signed-off-by: Didier Wenzek <didier.wenzek@free.fr>
Signed-off-by: Didier Wenzek <didier.wenzek@free.fr>
…mapper

Signed-off-by: James Rhodes <jarhodes314@gmail.com>
Signed-off-by: James Rhodes <jarhodes314@gmail.com>
NB This doesn't currently work since dynamic subscriptions aren't hooked up fully. tedge_mqtt_ext needs some further modifications to support sending the received messages on newly subscribed topics to the actor that requested the subscription

Signed-off-by: James Rhodes <jarhodes314@gmail.com>
}
},
Some(InputMessage::FsWatchEvent(e)) => {
tracing::warn!("TODO do something with {e:?}")
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can definitely ignore anything that is is related to a file with an unknown extension (such as .collectd.toml.swp and collectd.toml~).

  • Deletion of a pipeline should be okay, unless if we try to unsubscribe to the input topics.
  • Deletion of a script is more problematic, if still used by one of the pipeline.

///
/// Default: An empty topic list
pub subscriptions: TopicFilter,
pub subscriptions: Arc<Mutex<TopicFilter>>,
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A bit unusual to have interior mutability for a config struct - something granted as immutable.

Comment on lines +212 to +213
client: mqtt_channel::AsyncClient,
subscriptions: Arc<Mutex<TopicFilter>>,
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would combine these two fields of an mqtt_chanel::Connection in a single one (say a subscriber) that handles the whole logic below to compute the new subscription set and update mqtt_channel::AsyncClient.

let to_sub = to_sub.filters();
if !to_sub.is_empty() {
info!("Updating MQTT subscription to include {to_sub:?}");
client.subscribe_many(to_sub).await.unwrap();
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

NB This doesn't currently work since dynamic subscriptions aren't hooked up fully. tedge_mqtt_ext needs some further modifications to support sending the received messages on newly subscribed topics to the actor that requested the subscription

Okay, I understand this comment now. The MQTT subscription is correctly updated but the messages are not forwarded by this actor which ToPeers::peer_senders subscriptions list has not been updated.

Indeed, not easy to update from the FromPeers event loop this peer_senders list which is owned by the ToPeers event loop. A new channel, would be not enough as these peer Senders are anonymous. Which one would have to be updated?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here is a proposal to fix that: 43932cf

Signed-off-by: Didier Wenzek <didier.wenzek@free.fr>
Signed-off-by: Didier Wenzek <didier.wenzek@free.fr>
@didier-wenzek
Copy link
Copy Markdown
Contributor Author

Closing this POC which is superseded by #3650.

The main reason being the executable size (59M - release build, dynamically linked, stripped) compared to the POC using QuickJS (17M - release build, statically linked, stripped) and tedge without any JS support (16M- release build, statically linked, stripped)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants