External Triggers for Materializations: Implementation Decisions #2713
Replies: 2 comments
-
|
This makes lots of sense. Some feedback around the structure of I believe we can model it as something like: pub struct Triggers {
pub config: Vec<TriggerConfig>
#[serde(default)]
#[schemars(hide)]
pub sops: Option<models::RawValue>,
}
pub struct TriggerConfig {
pub url: String
pub method: String
#[schemars(extra_annotation="secret: true")] // auth: {additionalProperties: {type: string, secret: true}}
pub auth: BTreeMap<String, String>
pub payload_template: String
...
}Key ideas:
We definitely need to protect I think we can do this having a logic layer above which strips these fields from the documents submitted for encrypt / decrypt, and then re-applies them to the post-transformed model. E.g., |
Beta Was this translation helpful? Give feedback.
-
|
Two other notes before I forget:
|
Beta Was this translation helpful? Give feedback.
Uh oh!
There was an error while loading. Please reload this page.
Uh oh!
There was an error while loading. Please reload this page.
-
External Triggers for Materializations
This feature adds webhook triggers to materializations. When a materialization transaction commits, configured webhooks fire with metadata about the transaction. This lets users integrate external systems (Airflow, dbt, Slack, etc.) that need to react when new data lands.
Ref: Internal PRD document
This discussion covers the design choices I'd like input on.
Templating
Trigger payloads are Handlebars templates rendered against a set of variables. Handlebars is already used for email templates in the
notificationscrate, and it supports iteration (needed forcollection_names).Templates run in strict mode (unknown variables are errors) with no HTML escaping (payloads are JSON, not HTML).
At publish time, validation trial-renders every template with placeholder values and checks that the output is valid JSON.
Trigger variables
These are the variables available in templates:
materialization_namestringtask.shard_ref.name— the full materialization namecollection_namesstring[]connector_imagestringghcr.io/estuary/materialize-postgres:v1)flow_published_at_maxstring(RFC 3339)_meta/uuidclock across all documents in the transactionflow_published_at_minstring(RFC 3339)_meta/uuidclock across all documents in the transaction (new)flow_run_idstringVariables from the PRD that are not planned to be supported initially:
transaction_started_atandtransaction_completed_at: Using min/maxflow_published_atinsteadupdated_documents: Not practical to list every updated document's keydeployment_env: Unsure of what this refers toExample config
Trigger config encryption
Trigger configs contain auth credentials (bearer tokens, API keys, passwords). These need to be SOPS-encrypted, like connector endpoint configs.
Connector endpoint configs are handled transparently as raw bytes by the control plane and runtime, decrypting them when needed to run connectors. This is a bit awkward for trigger configurations, because their models are defined in Flow, rather than a connector, and the control plane and runtime need to deserialize the configurations into these typed models for validation and execution.
The solution I've come up with is to mostly handle them as raw bytes so they can be encrypted the same way as connector configs, and deserialize them when needed. Full decryption & deserialization is needed for executing the triggers while a materialization is running. For validation, only the trigger payload needs to be extracted to validate.
So
MaterializationDef.triggersis typed asOption<RawValue>rather thanOption<Triggers>:The built spec also carries the trigger config as opaque bytes:
In the publish path (via
flowctlindraft/encrypt.rs, or a future UI workflow), triggers are encrypted similarly to endpoint configs using the same config encryption API. This requires 2 calls to the encryption service when triggers are configured: One for the endpoint config, and an additional for the trigger configGetting the JSON schema to the UI
The UI will need the trigger config schema to render a form editor. This could be exposed through
flow-web, with a simple helper function:I also tossed around the idea of a new column / table in Supabase to hold this config for queries, or somehow injecting the config into every materialization's endpoint config (likely impractical), but that did not seem like an improvement over the
flow-webfunction.At-least-once delivery
Triggers must fire at least once per committed transaction. They may fire more than once, are are likely to do so from time to time. The mechanism:
TriggerVariablesare computed from the transaction's stats.StartCommit, trigger variables are serialized to RocksDB under a new"trigger-params"key, atomically in the sameWriteBatchas the runtime checkpoint.Acknowledgedresponse, the runtime reads"trigger-params"from RocksDB, renders templates, and delivers webhooks concurrently with retries."trigger-params"is deleted from RocksDB. This update may not be durably committed to the recovery log until the next transaction's Load phase is complete, similar to how connector state updates fromAcknowledgedare made durable.If the shard crashes after step 2 but before step 4, the trigger variables survive in RocksDB. On recovery, the next
Acknowledgedwill re-read and re-fire the webhooks.Permanently failing trigger webhooks will crash the shard and block the materialization. The
maxRetriesandtimeoutSecsconfig give users control over how aggressively to retry before crashing. Persisting just theTriggerVariablesrather than the entire webhook payload allows for correcting things like an invalidated API key, rather than dooming the task to send a payload which will never be valid again.Beta Was this translation helpful? Give feedback.
All reactions