Skip to content

materialization triggers#2800

Merged
williamhbaker merged 7 commits intomasterfrom
wb/triggers
Mar 24, 2026
Merged

materialization triggers#2800
williamhbaker merged 7 commits intomasterfrom
wb/triggers

Conversation

@williamhbaker
Copy link
Copy Markdown
Member

@williamhbaker williamhbaker commented Mar 20, 2026

Description:

Adds a new triggers configuration to materializations that fires webhook requests after each committed transaction. This enables users to notify downstream services (e.g. kick off a dbt job) when new data has been materialized.

See individual commit messages for specifics on the implementation, as well as this design discussion where higher-level points are described.

Testing:

  • Unit tests exercise the flow of persisting trigger variables and firing the triggers (webhooks)
  • Ran on a local stack: Used flowctl to publish a config testing payload delivery to webhook.site. Using flowctl, you can change the non-secret, non-HMAC protected values without decrypting the config.

These things are called "Triggers" as an overarching concept, even though they are webhooks, and the terminology of webhooks is inevitably mixed in. I wanted to use a more general term for the feature name, since it is perhaps conceivable that other kinds of triggers could be imagined in the future.

Workflow steps:

Create/edit materialization configs with the new triggers field, see the docs with this PR for examples.

Documentation links affected:

Some initial docs are included in this PR.

Notes for reviewers:

The additions to flow-web are somewhat speculative. I'm fairly confident the UI will need the triggers config schema for rendering a form eventually. The helpers strip_trigger_hmac_excluded_fields & restore_trigger_hmac_excluded_fields may provide equivalent behaviors as are built into flowctl to manipulate encrypted configs, but how this will actually be implemented in the UI form handling is TBD.

@github-actions
Copy link
Copy Markdown

github-actions bot commented Mar 20, 2026

PR Preview Action v1.8.1
Preview removed because the pull request was closed.
2026-03-24 21:43 UTC

.map(|c| clock_to_rfc3339(&c))
.unwrap_or_default();

let flow_run_id = uuid::Uuid::new_v4().to_string();
Copy link
Copy Markdown
Member Author

@williamhbaker williamhbaker Mar 20, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I originally had a UUID here mostly as a placeholder, and then never came back around to thinking about it anymore. Any random string should do the trick, since it is durably persisted and recovered when retrying the webhook delivery.

But perhaps there is something more useful we could use here - I am considering a hex-encoded hash of the runtime checkpoint, for example.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A thought: what about a wall-clock transaction time, such as the start time of the transaction from its own stats? More information bearing, monotonic, but also unique.

I don't think we'll regret a v4 UUID, but I'm also not convinced it rises to the level of deserving "an ID", which elevates it to a heaviness of concept / gravity of thing-ness that doesn't quite seem warranted.

pub headers: BTreeMap<String, String>,
/// # Handlebars template for the JSON payload body.
pub payload_template: String,
/// # Request timeout in seconds.
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: perhaps note that the task is failed if timeout_secs + max_attempts is violated.

Also, use humantime rather than a _secs suffix. See how min_txn_duration / max_txn_duration work as examples.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added that note, and also switched to humantime for the timeout.

pub materialization_name: String,
pub flow_published_at_min: String,
pub flow_published_at_max: String,
pub flow_run_id: String,
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: just run_id ? flow_published_at is a term of art we're pretty locked into, but in general we're moving away from "Estuary Flow" branding.

If this name has been discussed / agreed to already and I'm unaware, feel free to ignore.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changed to run_id! I don't think anything's been set in stone here, so that should be fine.

Some(t) => {
validate_triggers(scope.push_prop("triggers"), &t.config, errors);

// TODO(whb): SOPS HMAC integrity of encrypted triggers is currently
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We'd want to wire it into the Materialization Validate RPC, and have the runtime crate unseal as part of that call's implementation. That would happen within the data-plane that's intended to actually fire the triggers / has SOPs decrypt privilege.

Makes sense to do as a separate PR, though, and maybe not very high priority unless it's an issue in practice

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh yeah, that makes sense. Based on your other comment, I moved this decrypting into runtime::materialize::connector. It would be straightforward to include the trigger spec in the Validate RPC and attempt a decryption there...but yeah I'll wait to do that as a follow-up if its warranted.

&& let Some(variables) =
db.load_trigger_params::<models::TriggerVariables>().await?
{
fire_pending_triggers(compiled, &variables, http_client).await?;
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🤔 while this is fine for a first pass, it blocks the transaction loop and these triggers could block for a while. I think we're going to ultimately want to tokio::spawn this and service the JoinHandle in the main protocol loop.

This can wait for a for a follow-up

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've re-structured things a bit so that the trigger completion only blocks just prior to where the rocksdb state update that would remove the persisted trigger variables is actually made durable in persist_max_keys. I think this is the latest it makes sense to specifically await their completion, being after the Load phase is nearly complete and just before sending Stores for the next phase.

recv_client_open(&mut open, &db).await?;

// Extract trigger config and connector image from the spec.
let (triggers_json, connector_image) = {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this should happen in runtime::materialize::connector, alongside where unseal::decrypt_sops() of the endpoint config lives for each of the image and local connector cases.

This would also more-naturally power eventual Validate-time validation, discussed elsewhere

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done!

.map(|c| clock_to_rfc3339(&c))
.unwrap_or_default();

let flow_run_id = uuid::Uuid::new_v4().to_string();
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A thought: what about a wall-clock transaction time, such as the start time of the transaction from its own stats? More information bearing, monotonic, but also unique.

I don't think we'll regret a v4 UUID, but I'm also not convinced it rises to the level of deserving "an ID", which elevates it to a heaviness of concept / gravity of thing-ness that doesn't quite seem warranted.

:::note
Triggers fire once per materialization transaction, not once per document. A
single transaction may contain documents from multiple bindings. The
`collection_names` template variable lists which collections contributed
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right... for later, but reading this is making me realize that we may want resource paths to also be exposed to templates. That relates more closely to what the user may actually care about (which tables of mine were updated?).

@williamhbaker
Copy link
Copy Markdown
Member Author

Addressed feedback in a couple of fixup! commits that I will squash in before merging.

Two comments I couldn't reply to inline:

A thought: what about a wall-clock transaction time, such as the start time of the transaction from its own stats? More information bearing, monotonic, but also unique.

I have incorporated this, making the run_id the start time of the transaction. It'll be an RFC3339 timestamp string which seems as good as anything, though I did consider making it a Unix integer timestamp instead just to make it a little more opaque. But it should be something we can change later if there's a need to.

Right... for later, but reading this is making me realize that we may want resource paths to also be exposed to templates. That relates more closely to what the user may actually care about (which tables of mine were updated?).

This is almost certainly right, and collection names aren't really what people are going to want to know. Resource paths should probably be made available too as you suggest, and I'd imagine some other things we'll find users want as well. So probably in a follow-up we'll add resource paths & more.

Copy link
Copy Markdown
Member

@jgraettinger jgraettinger left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

Adds a new triggers_json field (tag 10) to MaterializationSpec in
the proto definition, along with the generated Go and Rust bindings.
This field carries JSON-encoded trigger configurations that the runtime
will use to fire webhooks after materialization transactions commit.
Introduces the triggers module in the models crate with:
- Triggers / TriggerConfig / HttpMethod model types with JSON
  Schema annotations (including secret: true on header values)
- TriggerVariables for template rendering context
- Handlebars-based render_payload_template with strict mode and no
  HTML escaping
- HMAC-excluded field strip/restore helpers so that non-secret fields
  (payload_template, timeout, retries) can be modified without
  invalidating the SOPS HMAC over encrypted header secrets
- triggers_schema() for the encryption layer
- Optional triggers field on MaterializationDef
Adds publication-time validation of materialization triggers:
- URL must be parseable
- timeout_secs must be > 0
- Payload template must be valid Handlebars that renders to valid JSON
  (validated against placeholder variables with placeholder header keys)

Includes the triggers and triggers_json fields in the built
MaterializationSpec, and updates all existing validation snapshots that
include built materialization specs.
…tees

Adds the runtime machinery for firing trigger webhooks with at-least-once
delivery guarantees:

- triggers module: computes TriggerVariables from transaction state,
  renders Handlebars templates, and sends webhooks concurrently with
  exponential backoff and retry (non-retryable 4xx fails immediately)
- RocksDB persistence: trigger variables are written atomically with the
  connector's StartedCommit checkpoint and cleared after successful
  Acknowledged delivery, ensuring pending triggers survive crashes
- serve_session: extracts and SOPS-decrypts trigger configs from the
  materialization spec at session start, computes variables per
  transaction, and wires them through the commit/acknowledge protocol
- Refactors per-binding stats into a named BindingStats struct and
  tracks first_source_clock for flow_published_at_min
Extends the draft encryption pipeline to encrypt trigger header secrets
using the config-encryption service. Non-secret fields (payload_template,
timeout_secs, max_retries) are stripped before encryption and restored
afterward so they are not covered by the SOPS HMAC — allowing users to
modify these fields without re-entering secret header values.

Renames encrypt_endpoint_configs to encrypt_configs to reflect the
broader scope.
Adds three WASM-exported functions for the UI's trigger configuration:
- get_trigger_config_schema: returns the JSON Schema for Triggers
- strip_trigger_hmac_excluded_fields: strips non-secret fields before
  sending to the config-encryption service
- restore_trigger_hmac_excluded_fields: restores them afterward

These mirror the Rust helpers and allow the browser-based UI to perform
the same strip/encrypt/restore workflow as flowctl.
@williamhbaker williamhbaker merged commit 2d071b5 into master Mar 24, 2026
12 checks passed
@williamhbaker williamhbaker deleted the wb/triggers branch March 24, 2026 21:41
github-actions bot pushed a commit to estuary/homebrew-flowctl that referenced this pull request Mar 25, 2026
This release includes support for the upcoming materialization triggers feature, in addition to several other bugfixes and improvements.

## What's Changed
- Fix double-slash in journal list selector breaking collection reads by @jgraettinger in estuary/flow#2718
- Include flowctl version in publication details by @psFried in estuary/flow#2748
- Improve error message for concurrent spec modifications by @jwhartley in estuary/flow#2678
- Add support for materialization triggers by @williamhbaker in estuary/flow#2800
williamhbaker added a commit to estuary/homebrew-flowctl that referenced this pull request Mar 25, 2026
This release includes support for the upcoming materialization triggers feature, in addition to several other bugfixes and improvements.

## What's Changed
- Fix double-slash in journal list selector breaking collection reads by @jgraettinger in estuary/flow#2718
- Include flowctl version in publication details by @psFried in estuary/flow#2748
- Improve error message for concurrent spec modifications by @jwhartley in estuary/flow#2678
- Add support for materialization triggers by @williamhbaker in estuary/flow#2800
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants