Conversation
|
| .map(|c| clock_to_rfc3339(&c)) | ||
| .unwrap_or_default(); | ||
|
|
||
| let flow_run_id = uuid::Uuid::new_v4().to_string(); |
There was a problem hiding this comment.
I originally had a UUID here mostly as a placeholder, and then never came back around to thinking about it anymore. Any random string should do the trick, since it is durably persisted and recovered when retrying the webhook delivery.
But perhaps there is something more useful we could use here - I am considering a hex-encoded hash of the runtime checkpoint, for example.
There was a problem hiding this comment.
A thought: what about a wall-clock transaction time, such as the start time of the transaction from its own stats? More information bearing, monotonic, but also unique.
I don't think we'll regret a v4 UUID, but I'm also not convinced it rises to the level of deserving "an ID", which elevates it to a heaviness of concept / gravity of thing-ness that doesn't quite seem warranted.
crates/models/src/triggers.rs
Outdated
| pub headers: BTreeMap<String, String>, | ||
| /// # Handlebars template for the JSON payload body. | ||
| pub payload_template: String, | ||
| /// # Request timeout in seconds. |
There was a problem hiding this comment.
nit: perhaps note that the task is failed if timeout_secs + max_attempts is violated.
Also, use humantime rather than a _secs suffix. See how min_txn_duration / max_txn_duration work as examples.
There was a problem hiding this comment.
Added that note, and also switched to humantime for the timeout.
crates/models/src/triggers.rs
Outdated
| pub materialization_name: String, | ||
| pub flow_published_at_min: String, | ||
| pub flow_published_at_max: String, | ||
| pub flow_run_id: String, |
There was a problem hiding this comment.
nit: just run_id ? flow_published_at is a term of art we're pretty locked into, but in general we're moving away from "Estuary Flow" branding.
If this name has been discussed / agreed to already and I'm unaware, feel free to ignore.
There was a problem hiding this comment.
Changed to run_id! I don't think anything's been set in stone here, so that should be fine.
| Some(t) => { | ||
| validate_triggers(scope.push_prop("triggers"), &t.config, errors); | ||
|
|
||
| // TODO(whb): SOPS HMAC integrity of encrypted triggers is currently |
There was a problem hiding this comment.
We'd want to wire it into the Materialization Validate RPC, and have the runtime crate unseal as part of that call's implementation. That would happen within the data-plane that's intended to actually fire the triggers / has SOPs decrypt privilege.
Makes sense to do as a separate PR, though, and maybe not very high priority unless it's an issue in practice
There was a problem hiding this comment.
Oh yeah, that makes sense. Based on your other comment, I moved this decrypting into runtime::materialize::connector. It would be straightforward to include the trigger spec in the Validate RPC and attempt a decryption there...but yeah I'll wait to do that as a follow-up if its warranted.
| && let Some(variables) = | ||
| db.load_trigger_params::<models::TriggerVariables>().await? | ||
| { | ||
| fire_pending_triggers(compiled, &variables, http_client).await?; |
There was a problem hiding this comment.
🤔 while this is fine for a first pass, it blocks the transaction loop and these triggers could block for a while. I think we're going to ultimately want to tokio::spawn this and service the JoinHandle in the main protocol loop.
This can wait for a for a follow-up
There was a problem hiding this comment.
I've re-structured things a bit so that the trigger completion only blocks just prior to where the rocksdb state update that would remove the persisted trigger variables is actually made durable in persist_max_keys. I think this is the latest it makes sense to specifically await their completion, being after the Load phase is nearly complete and just before sending Stores for the next phase.
| recv_client_open(&mut open, &db).await?; | ||
|
|
||
| // Extract trigger config and connector image from the spec. | ||
| let (triggers_json, connector_image) = { |
There was a problem hiding this comment.
I think this should happen in runtime::materialize::connector, alongside where unseal::decrypt_sops() of the endpoint config lives for each of the image and local connector cases.
This would also more-naturally power eventual Validate-time validation, discussed elsewhere
| .map(|c| clock_to_rfc3339(&c)) | ||
| .unwrap_or_default(); | ||
|
|
||
| let flow_run_id = uuid::Uuid::new_v4().to_string(); |
There was a problem hiding this comment.
A thought: what about a wall-clock transaction time, such as the start time of the transaction from its own stats? More information bearing, monotonic, but also unique.
I don't think we'll regret a v4 UUID, but I'm also not convinced it rises to the level of deserving "an ID", which elevates it to a heaviness of concept / gravity of thing-ness that doesn't quite seem warranted.
| :::note | ||
| Triggers fire once per materialization transaction, not once per document. A | ||
| single transaction may contain documents from multiple bindings. The | ||
| `collection_names` template variable lists which collections contributed |
There was a problem hiding this comment.
Right... for later, but reading this is making me realize that we may want resource paths to also be exposed to templates. That relates more closely to what the user may actually care about (which tables of mine were updated?).
|
Addressed feedback in a couple of Two comments I couldn't reply to inline:
I have incorporated this, making the
This is almost certainly right, and collection names aren't really what people are going to want to know. Resource paths should probably be made available too as you suggest, and I'd imagine some other things we'll find users want as well. So probably in a follow-up we'll add resource paths & more. |
Adds a new triggers_json field (tag 10) to MaterializationSpec in the proto definition, along with the generated Go and Rust bindings. This field carries JSON-encoded trigger configurations that the runtime will use to fire webhooks after materialization transactions commit.
Introduces the triggers module in the models crate with: - Triggers / TriggerConfig / HttpMethod model types with JSON Schema annotations (including secret: true on header values) - TriggerVariables for template rendering context - Handlebars-based render_payload_template with strict mode and no HTML escaping - HMAC-excluded field strip/restore helpers so that non-secret fields (payload_template, timeout, retries) can be modified without invalidating the SOPS HMAC over encrypted header secrets - triggers_schema() for the encryption layer - Optional triggers field on MaterializationDef
Adds publication-time validation of materialization triggers: - URL must be parseable - timeout_secs must be > 0 - Payload template must be valid Handlebars that renders to valid JSON (validated against placeholder variables with placeholder header keys) Includes the triggers and triggers_json fields in the built MaterializationSpec, and updates all existing validation snapshots that include built materialization specs.
…tees Adds the runtime machinery for firing trigger webhooks with at-least-once delivery guarantees: - triggers module: computes TriggerVariables from transaction state, renders Handlebars templates, and sends webhooks concurrently with exponential backoff and retry (non-retryable 4xx fails immediately) - RocksDB persistence: trigger variables are written atomically with the connector's StartedCommit checkpoint and cleared after successful Acknowledged delivery, ensuring pending triggers survive crashes - serve_session: extracts and SOPS-decrypts trigger configs from the materialization spec at session start, computes variables per transaction, and wires them through the commit/acknowledge protocol - Refactors per-binding stats into a named BindingStats struct and tracks first_source_clock for flow_published_at_min
Extends the draft encryption pipeline to encrypt trigger header secrets using the config-encryption service. Non-secret fields (payload_template, timeout_secs, max_retries) are stripped before encryption and restored afterward so they are not covered by the SOPS HMAC — allowing users to modify these fields without re-entering secret header values. Renames encrypt_endpoint_configs to encrypt_configs to reflect the broader scope.
Adds three WASM-exported functions for the UI's trigger configuration: - get_trigger_config_schema: returns the JSON Schema for Triggers - strip_trigger_hmac_excluded_fields: strips non-secret fields before sending to the config-encryption service - restore_trigger_hmac_excluded_fields: restores them afterward These mirror the Rust helpers and allow the browser-based UI to perform the same strip/encrypt/restore workflow as flowctl.
863bf76 to
0132bba
Compare
0132bba to
b22b045
Compare
This release includes support for the upcoming materialization triggers feature, in addition to several other bugfixes and improvements. ## What's Changed - Fix double-slash in journal list selector breaking collection reads by @jgraettinger in estuary/flow#2718 - Include flowctl version in publication details by @psFried in estuary/flow#2748 - Improve error message for concurrent spec modifications by @jwhartley in estuary/flow#2678 - Add support for materialization triggers by @williamhbaker in estuary/flow#2800
This release includes support for the upcoming materialization triggers feature, in addition to several other bugfixes and improvements. ## What's Changed - Fix double-slash in journal list selector breaking collection reads by @jgraettinger in estuary/flow#2718 - Include flowctl version in publication details by @psFried in estuary/flow#2748 - Improve error message for concurrent spec modifications by @jwhartley in estuary/flow#2678 - Add support for materialization triggers by @williamhbaker in estuary/flow#2800
Description:
Adds a new triggers configuration to materializations that fires webhook requests after each committed transaction. This enables users to notify downstream services (e.g. kick off a dbt job) when new data has been materialized.
See individual commit messages for specifics on the implementation, as well as this design discussion where higher-level points are described.
Testing:
flowctlto publish a config testing payload delivery to webhook.site. Usingflowctl, you can change the non-secret, non-HMAC protected values without decrypting the config.These things are called "Triggers" as an overarching concept, even though they are webhooks, and the terminology of webhooks is inevitably mixed in. I wanted to use a more general term for the feature name, since it is perhaps conceivable that other kinds of triggers could be imagined in the future.
Workflow steps:
Create/edit materialization configs with the new
triggersfield, see the docs with this PR for examples.Documentation links affected:
Some initial docs are included in this PR.
Notes for reviewers:
The additions to
flow-webare somewhat speculative. I'm fairly confident the UI will need the triggers config schema for rendering a form eventually. The helpersstrip_trigger_hmac_excluded_fields&restore_trigger_hmac_excluded_fieldsmay provide equivalent behaviors as are built intoflowctlto manipulate encrypted configs, but how this will actually be implemented in the UI form handling is TBD.