Skip to content

feat: tedge flows, a configurable mapper using JavaScript#3650

Merged
didier-wenzek merged 72 commits intothin-edge:mainfrom
didier-wenzek:poc/tedge-quickjs-mapper
Aug 8, 2025
Merged

feat: tedge flows, a configurable mapper using JavaScript#3650
didier-wenzek merged 72 commits intothin-edge:mainfrom
didier-wenzek:poc/tedge-quickjs-mapper

Conversation

@didier-wenzek
Copy link
Copy Markdown
Contributor

@didier-wenzek didier-wenzek commented May 28, 2025

Proposed changes

A first step toward configurable mappers that users can extend and adapt to their use-cases
with their own filtering and message transformation rules.

The following steps are described here: #3756

Features:

  • Define the behavior of a mapper as a combination of flows, steps and transformation scripts
  • Consume, transform and produce MQTT messages.
  • Use JavaScript to define step functions
  • Define flows as chain of transformation steps: MQTT sub| step-1 |step-2 | ... | step-n | MQTT pub
  • Configure flows and steps
  • Dynamically update step config using MQTT
  • Implement state-full filters (deferring messages up to the end of a time window)
  • Combine user-provided JS filters with built-in Rust filters

Criteria:

  • Expressive power?
  • Dev tooling ?
  • Performance? Memory footprint?
  • Debuggability?

Examples:

  • Add missing timestamps
  • Discard measurements with timestamp out of the expected range (too old or in the future)
  • Translate thin-edge json measurment into cumulocity json
  • Add measurement units - read from topic metadata
  • Translate Collectd data into thin-edge JSON
  • Group messages received during the same time window
  • Compute 5-minute rolling average, min, max
  • A circuit breaker filter detecting that too many messages are being sent within a given period
  • See also real use-case examples

Types of changes

  • Bugfix (non-breaking change which fixes an issue)
  • New feature (non-breaking change which adds functionality)
  • Improvement (general improvements like code refactoring that doesn't explicitly fix a bug or add any new functionality)
  • Documentation Update (if none of the other choices apply)
  • Breaking change (fix or feature that would cause existing functionality to not work as expected)

Paste Link to the issue

#3756

Checklist

  • I have read the CONTRIBUTING doc
  • I have signed the CLA (in all commits with git commit -s. You can activate automatic signing by running just prepare-dev once)
  • I ran just format as mentioned in CODING_GUIDELINES
  • I used just check as mentioned in CODING_GUIDELINES
  • I have added tests that prove my fix is effective or that my feature works
  • I have added necessary documentation (if appropriate)

Further comments

This PR uses quickJS as an alternative to:

@codecov
Copy link
Copy Markdown

codecov bot commented May 28, 2025

@didier-wenzek didier-wenzek marked this pull request as draft May 28, 2025 14:21
@didier-wenzek
Copy link
Copy Markdown
Contributor Author

The javascript examples have to be updated.

For instance the syntax config.topic || "te/device/main///m/collectd" is not supported. See fbe1341

@reubenmiller
Copy link
Copy Markdown
Contributor

The javascript examples have to be updated.

For instance the syntax config.topic || "te/device/main///m/collectd" is not supported. See fbe1341

Though it seems that Optional Chaining is part of ES2020 (which means it should be supported by QuickJS, I confirmed on the WASM version of QuickJS at least), so that would be the most similar to your previous syntax (just adding a ? before the .).

For example:

let topic = config?.topic || "te/device/main///m/collectd";

Copy link
Copy Markdown
Contributor

@albinsuresh albinsuresh left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here is my first round of review comments focussing purely on the pipeline APIs and not much of the rust infra code. Unfortunately, I'm having some trouble testing the feature due to some certificate misconfiguration preventing the mapper from even starting with the following error:

Jun 10 06:36:23 2a9636a9acc5 tedge-mapper[3107]: 2025-06-10T06:36:23.630655575Z ERROR Runtime: Actor MQTT-2 has finished unsuccessfully: ActorError(InvalidPrivateKey(InvalidCertificate(Other(OtherError(UnsupportedCertVersion)))))

Will provide additional usability feedback once that issue is resolved.

stages = [
{ filter = "add_timestamp.js" },
{ filter = "drop_stragglers.js", config = { max_delay = 60 } },
{ filter = "te_to_c8y.js", meta_topics = ["te/+/+/+/+/m/+/meta"] }
Copy link
Copy Markdown
Contributor

@albinsuresh albinsuresh Jun 9, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wondering if stage-wise meta_topics is really necessary or a pipe-line level meta_topics would be sufficient (and simpler to maintain).

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What we want here is for the messages received on the meta topics to be directly delivered to the stage. The other stages have no way to do something meaningful with these messages (In this case, units for various measurement types).

A better name might be config_topics.

stages = [
{ filter = "add_timestamp.js" },
{ filter = "drop_stragglers.js", config = { max_delay = 60 } },
{ filter = "te_to_c8y.js", meta_topics = ["te/+/+/+/+/m/+/meta"] }
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Expressing the correlation between the input_topic and meta_topic with wildcards appears slightly inaccurate/misleading. For example, when the above definition is applied on a te/+/+/+/+/m/temperature measurement, the corresponding meta_topic should have only been te/+/+/+/+/m/temperature/meta. But with the wildcard te/+/+/+/+/m/+/meta subscription, it covers the meta topics of other measurements like m/pressure/meta, m/humidity/meta etc as well. Wondering if we should introduce some sort of variable substitution syntax like m/{m_type} and m/{m_type}/meta so that this correlation is better expressed.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What you propose seems too specific to this specific example (adding units to measurements leveraging the meta topics as defined by thin-edge). Sure the script (here te_to_c8y.js) has to appropriately match data and meta data, but why should the generic mapper enforce that from the outside?

@albinsuresh
Copy link
Copy Markdown
Contributor

Unfortunately, I'm having some trouble testing the feature due to some certificate misconfiguration preventing the mapper from even starting with the following error:

Jun 10 06:36:23 2a9636a9acc5 tedge-mapper[3107]: 2025-06-10T06:36:23.630655575Z ERROR Runtime: Actor MQTT-2 has finished unsuccessfully: ActorError(InvalidPrivateKey(InvalidCertificate(Other(OtherError(UnsupportedCertVersion)))))

Will provide additional usability feedback once that issue is resolved.

Though I couldn't fix the cert issue, I've moved on by completely disabling auth settings.


stages = [
{ filter = "add_timestamp.js" },
{ filter = "circuit-breaker.js", tick_every_seconds = 1, config = { stats_topic = "te/error", too_many = 10000, message_on_too_many = { topic = "te/device/main///a/too-many-messages", payload = "too many messages" }, message_on_back_to_normal = { topic = "te/device/main///a/too-many-messages", payload = "back to normal" } } }
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This line highlights pain points with the current configuration format.

  • There are two levels of config which is confusing. tick_every_second is used by the engine, while config.too_many is passed to the script. Flattening the config might be simpler.
  • TOML inline tables are intended to appear on a single line. Leading to painfully long lines as here.
  • Passing a JSON payload to the filter config is not straightforward. Currently the engine expects a string and the JS filter has to stringyfy the payload.

didier-wenzek and others added 30 commits August 8, 2025 14:22
Signed-off-by: Didier Wenzek <didier.wenzek@free.fr>
In some context `Flow script` is more appropriate.
For instance, when two steps use the same script.

Signed-off-by: Didier Wenzek <didier.wenzek@free.fr>
Signed-off-by: Didier Wenzek <didier.wenzek@free.fr>
Signed-off-by: Didier Wenzek <didier.wenzek@free.fr>
Signed-off-by: Didier Wenzek <didier.wenzek@free.fr>
- process -> onMessage
- tick -> onInterval
- updateConfig -> OnConfigUpdate

Signed-off-by: Didier Wenzek <didier.wenzek@free.fr>
Add timestamp to message object

Signed-off-by: Didier Wenzek <didier.wenzek@free.fr>
Signed-off-by: Didier Wenzek <didier.wenzek@free.fr>
Signed-off-by: Didier Wenzek <didier.wenzek@free.fr>
Signed-off-by: Didier Wenzek <didier.wenzek@free.fr>
Signed-off-by: reubenmiller <reuben.d.miller@gmail.com>
Signed-off-by: Didier Wenzek <didier.wenzek@free.fr>
Signed-off-by: Didier Wenzek <didier.wenzek@free.fr>
For now, the bindgen feature is enable for all targets.
One could consider to use bindings shipped with quickjs when available.
See https://github.com/delskayn/rquickjs?tab=readme-ov-file#supported-platforms

Signed-off-by: Didier Wenzek <didier.wenzek@free.fr>
For now, the bindgen feature is enable for all targets.
One could consider to use bindings shipped with quickjs when available.
See https://github.com/delskayn/rquickjs?tab=readme-ov-file#supported-platforms

Signed-off-by: Didier Wenzek <didier.wenzek@free.fr>
Signed-off-by: Didier Wenzek <didier.wenzek@free.fr>
The goal is to be in position to add other types of inputs.

Signed-off-by: Didier Wenzek <didier.wenzek@free.fr>
Signed-off-by: reubenmiller <reuben.d.miller@gmail.com>
Signed-off-by: reubenmiller <reuben.d.miller@gmail.com>
…ort compiling quickjs

Signed-off-by: reubenmiller <reuben.d.miller@gmail.com>
Signed-off-by: reubenmiller <reuben.d.miller@gmail.com>
Signed-off-by: reubenmiller <reuben.d.miller@gmail.com>
Signed-off-by: Didier Wenzek <didier.wenzek@free.fr>
The type has also been changed, from a number of seconds
to a human time duration.

Signed-off-by: Didier Wenzek <didier.wenzek@free.fr>
Signed-off-by: Didier Wenzek <didier.wenzek@free.fr>
Signed-off-by: Didier Wenzek <didier.wenzek@free.fr>
Signed-off-by: Didier Wenzek <didier.wenzek@free.fr>
Signed-off-by: Didier Wenzek <didier.wenzek@free.fr>
error: hiding a lifetime that's elided elsewhere is confusing

Signed-off-by: Didier Wenzek <didier.wenzek@free.fr>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

feature Change request theme:developer-exp Theme: improve developer experience theme:flows

Projects

None yet

Development

Successfully merging this pull request may close these issues.

6 participants