[Alerting] Alerting v2: Add support of streaming in the rule executor#252544
[Alerting] Alerting v2: Add support of streaming in the rule executor#252544cnasikas merged 30 commits intoelastic:alerting_v2from
Conversation
fb2e0e7 to
1c53214
Compare
1c53214 to
d10ad5f
Compare
be09bc4 to
ac625c6
Compare
| strategy, | ||
| }) | ||
| executionContext.throwIfAborted(); | ||
| return this.processAlertEvents(rule, alertEvents, strategy, executionContext); |
There was a problem hiding this comment.
The director works on the batch that came from the previous step stream. We may need to change the director to own multi-batch iteration in the future (e.g., accumulating events across batches). I decided not to do it in this PR and wait for the stress testing to see how streaming behaves. I tested with 10K docs, and the batches are of ~240 items.
|
|
||
| export interface QueryServiceContract { | ||
| executeQuery(params: ExecuteQueryParams): Promise<EsqlQueryResponse>; | ||
| executeQueryRows<T = Record<string, unknown>>(params: ExecuteQueryParams): Promise<T[]>; |
There was a problem hiding this comment.
I can merge executeQueryRows with executeQuery if you like and always return an array of records.
There was a problem hiding this comment.
what do you think if we mark executeQuery as deprecated, and we will move consumers to executeQueryRows (or streaming) as we work on them, and then we can remove executeQuery when no more usage?
4b3a502 to
5c49952
Compare
4b9b591 to
fc38099
Compare
| }); | ||
| await this.storageService.bulkIndexDocs({ | ||
| index: ALERT_EVENTS_DATA_STREAM, | ||
| docs: requiredState.state.alertEventsBatch, |
There was a problem hiding this comment.
Same as the director, we may need to change the storage service to own multi-batch iteration in the future (e.g., accumulating events across batches). I decided not to do it in this PR and wait for the stress testing to see how streaming behaves. I tested with 10K docs, and the batches are of ~240 items.
There was a problem hiding this comment.
You mean not processing the bulk per batch?
There was a problem hiding this comment.
We always work in bulk per batch. What I mean is that the store stream would collect more batches from the other streams before storing the events to ES. Different pace than the other streams. Imagine the ESQL query stream sends one batch, the store stream stores that in memory and waits for the next batch. The query streams keep sending batches to the store stream. The store stream at some point after N batches decides to send them as one bulk request to ES. I decided not do it in this PR and always do one ES call per batch.
…ally, or add an exception to src/dev/yarn_deduplicate/index.ts and then commit the changes and push to your branch
…otstrap' locally and then commit the changes and push to your branch
|
Pinging @elastic/response-ops (Team:ResponseOps) |
kdelemme
left a comment
There was a problem hiding this comment.
Did a first pass, I have a few questions but otherwise looks good to me. Not gonna lie, this PR is very low level, I've been asking Claude for explaining bits here and there.
| try { | ||
| await disposer(); | ||
| } catch (error) { | ||
| firstError ??= error; |
There was a problem hiding this comment.
I keep forgetting about this operator
| const unsubscribe = this.onAbort(() => { | ||
| scope.disposeAll().catch(() => { | ||
| // Disposal errors during abort are best-effort. | ||
| // The primary abort reason is already propagated through | ||
| // the stream/pipeline error path. | ||
| }); | ||
| }); | ||
|
|
||
| scope.add(unsubscribe); |
| this.logger.debug({ message: `[${this.name}] State not ready, halting` }); | ||
| return { type: 'halt', reason: 'state_not_ready' }; | ||
| } | ||
| const requiredState = requireState(state, ['rule', 'esqlRowBatch']); |
There was a problem hiding this comment.
Nice I'll steal this for the dispatcher pipeline
| if (!requiredState.ok) { | ||
| step.logger.debug({ message: `[${step.name}] State not ready, halting` }); | ||
| yield requiredState.result; | ||
| return; | ||
| } |
There was a problem hiding this comment.
In under what conditions the state would not be ready? and does the full pipeline stops?
There was a problem hiding this comment.
It should never be the case. It is a guardrail in case we mess up with the order of the steps, or we forget to omit a value from a step. Steps are agnostic on the other steps, so they should not assume the other steps will pass down the correct values. Yes, the full pipeline will stop.
| const requiredState = requireState(state, ['rule']); | ||
|
|
||
| const { rule } = state; | ||
| if (!requiredState.ok) { | ||
| step.logger.debug({ message: `[${step.name}] State not ready, halting` }); | ||
| yield requiredState.result; | ||
| return; | ||
| } |
There was a problem hiding this comment.
We are doing this in every step, should we move this logic into the requireState and maybe call it assertValidStateForStep(state, keys, step) or something like that?
There was a problem hiding this comment.
nevermind, I saw we use it in non generator function as well (mapStep)
| }); | ||
| await this.storageService.bulkIndexDocs({ | ||
| index: ALERT_EVENTS_DATA_STREAM, | ||
| docs: requiredState.state.alertEventsBatch, |
There was a problem hiding this comment.
You mean not processing the bulk per batch?
| // Timestamp when the alert event is written to the index. | ||
| const wroteAt = new Date().toISOString(); | ||
| const source = 'internal'; | ||
| let index = 0; |
| const activeGroupHashes = await step.fetchActiveAlertGroupHashes( | ||
| rule.id, | ||
| input.executionContext | ||
| ); |
There was a problem hiding this comment.
It calls fetchActiveAlertGroupHashes on every batch, querying ES for groups that are currently in an active episode state. But StoreAlertEventsStep downstream writes batches with refresh: 'wait_for'.
When 2nd batch CreateRecoveryEventsStep runs, batch 1 has already been stored (with refresh: 'wait_for'). The active group hashes query now sees batch 1's data.
You mentioned to change the director to own multi-batch iteration in the future. Shall we address this in a follow up?
There was a problem hiding this comment.
Additional observation, for each batch, the pipeline makes:
- Recovery step: fetchActiveAlertGroupHashes — 1 ES|QL query
- Recovery step (if query type): recovery query — 1 more ES|QL query (probably won't be used often, but still)
- Director step: fetchLatestAlertStateByGroupHash — 1 ES|QL query
That's 2-3 ES round-trips per batch, on top of the bulk index. For 42 batches, that's 84-126 additional ES queries per rule execution. Each is a full ES|QL query with parsing overhead.
So the director and recovery steps don't benefit from streaming, they do per-batch ES lookups that could be batched or cached across the pipeline run.
There was a problem hiding this comment.
It calls fetchActiveAlertGroupHashes on every batch, querying ES for groups that are currently in an active episode state. But StoreAlertEventsStep downstream writes batches with refresh: 'wait_for'.
When 2nd batch CreateRecoveryEventsStep runs, batch 1 has already been stored (with refresh: 'wait_for'). The active group hashes query now sees batch 1's data.
This is by design. Initially, the director was an async task outside of the rule executor. The same could happen there. For queries that have a group by, it will always be one record per group, and this scenario you are describing will never happen. It will happen for queries that do not group or return more records per group.
You mentioned to change the director to own multi-batch iteration in the future. Shall we address this in a follow up?
It will not resolve the issue. At some point, the director needs to stop fetching to avoid out-of-memory issues. So at some point, the director would see the N-1 stored in the ES batch data. Also, it is not trivial how the multi-batch should work. It would be that one batch is very big and the others are very small. You have to know how much free memory you have at a time before asking for another batch. Same, one batch could contain hundreds of group hashes and another a few. The queries are dependent on the number of group hashes.
That's 2-3 ES round-trips per batch, on top of the bulk index. For 42 batches, that's 84-126 additional ES queries per rule execution. Each is a full ES|QL query with parsing overhead.
So the director and recovery steps don't benefit from streaming, they do per-batch ES lookups that could be batched or cached across the pipeline run.
This is a very good observation, and we could optimize the trips by adding a step that will do one query for each batch that will contain all the information needed by the Recovery and Director Step. I think we need to measure to better understand how the streaming is performed under stress. Doing so many ES queries per rule execution is not acceptable. Still, we cannot avoid 2-3 queries per batch. By adding streaming support, we tradeoff low memory and CPU footprint for doing more ESQL queries. Let's brainstorm offline to see what we could do.
There was a problem hiding this comment.
Not related to you changes, but was looking at the performance bottleneck with Opus at your rule executor pipeline.
refresh: 'wait_for' blocks until the next ES refresh cycle makes the documents searchable. With default refresh interval of 1 second, each batch write blocks for up to 1 second.
For a streaming pipeline designed for throughput, blocking on index refresh per batch defeats much of the streaming benefit.
Suggested:
- refresh: false and doing a single _refresh at the end
- only using wait_for on the final batch.
There was a problem hiding this comment.
Good point. I will set it to false and do a single _refresh at the end of the pipeline execution.
| const requiredState = requireState(state, ['rule', 'alertEventsBatch']); | ||
|
|
||
| if (!requiredState.ok) { | ||
| step.logger.debug({ message: `[${step.name}] State not ready, halting` }); | ||
| yield requiredState.result; | ||
| return; | ||
| } |
There was a problem hiding this comment.
Every step repeats this 5-line pattern.
We can add these helpers:
// stream_utils.ts
export const guardedExpandStep = <K extends OptionalStateKey>(
input: PipelineStateStream,
requiredKeys: readonly K[],
handler: (state: StateWith<K>) => AsyncIterable<StepStreamResult>
): PipelineStateStream =>
expandStep(input, async function* (state) {
const result = requireState(state, requiredKeys);
if (!result.ok) {
yield result.result;
return;
}
yield* handler(result.state);
});
export const guardedMapStep = <K extends OptionalStateKey>(
input: PipelineStateStream,
requiredKeys: readonly K[],
handler: (state: StateWith<K>) => Promise<StepStreamResult> | StepStreamResult
): PipelineStateStream =>
mapStep(input, async (state) => {
const result = requireState(state, requiredKeys);
if (!result.ok) {
return result.result;
}
return handler(result.state);
});To the usage changes to:
return guardedExpandStep(streamState, ['rule', 'esqlRowBatch'], async function* (state) {
if (!buildBatch) {
buildBatch = createAlertEventsBatchBuilder({
ruleId: state.input.ruleId,
spaceId: state.input.spaceId,
ruleAttributes: state.rule,
scheduledTimestamp: state.input.scheduledAt,
ruleVersion: 1,
});
}
const alertEventsBatch = buildBatch([...state.esqlRowBatch]);
if (alertEventsBatch.length > 0) {
yield { type: 'continue', state: { ...state, alertEventsBatch } };
}
});|
Suggestion: The
// execution_context_store.ts
const store = new AsyncLocalStorage<ExecutionContext>();
export const runWithExecutionContext = <T>(ctx: ExecutionContext, fn: () => T): T =>
store.run(ctx, fn);
export const getExecutionContext = (): ExecutionContext => {
const ctx = store.getStore();
if (!ctx) throw new Error('No ExecutionContext — called outside rule execution');
return ctx;
};Pipeline wraps the execution once: return runWithExecutionContext(executionContext, async () => { /* run steps */ });Then services, director, and steps call This would:
Not blocking. But as more services need cancellation-awareness, the threading cost grows. |
⏳ Build in-progress, with failures
Failed CI Steps
Test Failures
History
cc @cnasikas |
## Summary ### Key capabilities - **ES|QL-native rule evaluation** — Rules are defined as ES|QL queries with optional WHERE clause conditions, evaluated on a configurable schedule - **Alert lifecycle management** — Full episode tracking with pending → active → recovering → inactive state transitions, including configurable alert delay (consecutive breaches / duration) - **Event-driven architecture** — Alert events and actions are stored in dedicated data streams (`.alerting-events`, `.alerting-actions`) with ES|QL views for querying - **Notification dispatch pipeline** — A multi-step dispatcher that matches alert episodes to notification policies, handles throttling/suppression, and triggers Kibana Workflows using encrypted API keys - **Notification policies** — CRUD APIs and UI for creating notification policies with KQL-based rule matching, workflow integration, and API key management - **Rule authoring UI** — A shared rule form package (`@kbn/alerting-v2-rule-form`) usable standalone or embedded in Discover, with ES|QL editor, WHERE clause condition editing, recovery configuration, and live query preview - **Rule management UI** — Full rule list with pagination, enable/disable, clone, edit, and delete operations - **APM instrumentation** — Middleware and decorators for tracing rule execution and client operations ### Architecture highlights - **InversifyJS DI** — All services use constructor injection with typed tokens, scoped per-request or singleton as appropriate - **Pipeline pattern** — Rule executor and dispatcher use composable step-based pipelines - **Saved Objects** — Rules stored as hidden saved objects; notification policies stored as encrypted saved objects (for API key protection) - **Feature privileges** — Dedicated Kibana feature with read/all privileges for RBAC --- ## Contained PRs <details> <summary><strong>Core Engine & Plugin Init</strong> (12 PRs)</summary> - #247283 — Init alerting v2 plugin (@cnasikas) - #247452 — Add the alerting v2 feature privileges (@cnasikas) - #247673 — Director (@cnasikas) - #248306 — Create basic services (@cnasikas) - #248696 — Initialize all resources (@cnasikas) - #250023 — Schema package (@cnasikas) - #250010 — YML Editor (@cnasikas) - #251064 — Remove index.mode: lookup for RnA alert indices (@cnasikas) - #251707 — Simplify task registration pattern (@kdelemme) - #251876 — Dedicated user service (@cnasikas) - #252073 — Use `kbn/data-streams` in alerting_v2 (@cnasikas) - #255120 — Update alerting-v2 owner to new rna project team (@cnasikas) </details> <details> <summary><strong>Rule Execution Pipeline</strong> (12 PRs)</summary> - #247472 — Add alerting v2 Rule Executor (@darnautov) - #248285 — Alerting v2 rule HTTP APIs (@darnautov) - #248728 — Add basic alert actions route (@darnautov) - #250161 — Refactor rule executor to use a pipeline pattern (@darnautov) - #252292 — Implement the CountTimeframeStrategy for the director (@cnasikas) - #252544 — Add support of streaming in the rule executor (@darnautov) - #252754 — Update rule attributes (@kdelemme) - #253355 — Add getRules client method (@kdelemme) - #253668 — Make evaluation.query.condition optional (@kdelemme) - #254031 — Add recovery event generation to rule execution pipeline (@kdelemme) - #255968 — ES|QL views (@adcoelho) - #256697 — Create episodes ES|QL view (@adcoelho) </details> <details> <summary><strong>Alert Suppression & Episodes</strong> (3 PRs)</summary> - #252174 — Alert suppression (@kdelemme) - #256486 — Fix suppression query (@kdelemme) - #256527 — Store 'unmatched' action for unmatched alert episodes (@kdelemme) </details> <details> <summary><strong>Dispatcher & Notification Engine</strong> (6 PRs)</summary> - #250822 — Alerting v2 dispatcher (@kdelemme) - #251529 — Use query service in dispatcher (@kdelemme) - #251679 — Dispatcher task (@kdelemme) - #252758 — Dispatcher notification policy (@kdelemme) - #255332 — Wait for resources before scheduling dispatcher task (@kdelemme) - #256536 — Use stored encrypted API keys from Notification Policy in dispatcher step (@kdelemme) </details> <details> <summary><strong>Notification Policies (Server)</strong> (4 PRs)</summary> - #251336 — Introduce notification policy CRUD APIs and client (@cnasikas) - #253134 — Update notification policy (@cnasikas) - #254808 — Store API key owner on Notification Policy (@kdelemme) - #256940 — Make notification policies global with optional rule-label scoping (@kdelemme) </details> <details> <summary><strong>Notification Policies UI</strong> (1 PR)</summary> - #255599 — Add notification policies UI and Storybook form story (@adcoelho) </details> <details> <summary><strong>Rule Authoring UI</strong> (13 PRs)</summary> - #250961 — Add create rule flyout in Discover (@adcoelho) - #255111 — Add activation configuration fields to alerting V2 rule form (@yiannisnikolopoulos) - #255427 — Rule form: provide services via context (@dominiqueclarke) - #255876 — MVP rule form, Split evaluation condition, and Recovery configuration (@dominiqueclarke) - #256260 — Foundational rule list (@dominiqueclarke) - #256756 — Wire up edit flow (@dominiqueclarke) - #256801 — Move consecutive breaches max to shared constants (@yiannisnikolopoulos) - #256818 — Preview query and design parity (@dominiqueclarke) - #256938 — Allow clearing number inputs in state transition fields (@yiannisnikolopoulos) - #257017 — Add enable/disable and clone rule to rule list (@dominiqueclarke) - #257246 — Remove all React.FC (@dominiqueclarke) - #257415 — Rule form - fix test (@dominiqueclarke) - #257454 — Block comma key in number input component (@yiannisnikolopoulos) </details> <details> <summary><strong>API Documentation & Schema</strong> (2 PRs)</summary> - #254901 — Rename indexes for alert events and actions (@adcoelho) - #255810 — OAS for alert action routes (@adcoelho) </details> <details> <summary><strong>Observability & Monitoring</strong> (3 PRs)</summary> - #254925 — Add ApmMiddleware to the rule executor (@adcoelho) - #255115 — Add the withAPM decorator and apply it to the rules_client (@adcoelho) - #255999 — Fix linting problem in apm middleware (@adcoelho) </details> <details> <summary><strong>CI & Maintenance</strong> (2 PRs)</summary> - #257409 — Refactor SO services to use inversify DI for client initialization (@darnautov) - Fix alerting-v2-schema jest config (@darnautov) </details> --- --------- Co-authored-by: Dima Arnautov <dmitrii.arnautov@elastic.co> Co-authored-by: kibanamachine <42973632+kibanamachine@users.noreply.github.com> Co-authored-by: Kevin Delemme <kevin.delemme@elastic.co> Co-authored-by: Mike Côté <mikecote@users.noreply.github.com> Co-authored-by: Antonio <antonio.coelho@elastic.co> Co-authored-by: Kevin Delemme <kdelemme@gmail.com> Co-authored-by: Dominique Clarke <dominique.clarke@elastic.co> Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> Co-authored-by: Jason Rhodes <jason.rhodes@elastic.co> Co-authored-by: Yiannis Nikolopoulos <yiannis.nikolopoulos@elastic.co> Co-authored-by: Alexi Doak <109488926+doakalexi@users.noreply.github.com> Co-authored-by: Claude Opus 4.6 <noreply@anthropic.com> Co-authored-by: Bailey Cash <bailey.cash@elastic.co> Co-authored-by: Anna Davydova <ana.davydova@elastic.co> Co-authored-by: Umberto Pepato <umbopepato@users.noreply.github.com> Co-authored-by: Jason Rhodes <jason.matthew.rhodes@gmail.com> Co-authored-by: Joana Cardoso <169058851+joana-cps@users.noreply.github.com>
## Summary ### Key capabilities - **ES|QL-native rule evaluation** — Rules are defined as ES|QL queries with optional WHERE clause conditions, evaluated on a configurable schedule - **Alert lifecycle management** — Full episode tracking with pending → active → recovering → inactive state transitions, including configurable alert delay (consecutive breaches / duration) - **Event-driven architecture** — Alert events and actions are stored in dedicated data streams (`.alerting-events`, `.alerting-actions`) with ES|QL views for querying - **Notification dispatch pipeline** — A multi-step dispatcher that matches alert episodes to notification policies, handles throttling/suppression, and triggers Kibana Workflows using encrypted API keys - **Notification policies** — CRUD APIs and UI for creating notification policies with KQL-based rule matching, workflow integration, and API key management - **Rule authoring UI** — A shared rule form package (`@kbn/alerting-v2-rule-form`) usable standalone or embedded in Discover, with ES|QL editor, WHERE clause condition editing, recovery configuration, and live query preview - **Rule management UI** — Full rule list with pagination, enable/disable, clone, edit, and delete operations - **APM instrumentation** — Middleware and decorators for tracing rule execution and client operations ### Architecture highlights - **InversifyJS DI** — All services use constructor injection with typed tokens, scoped per-request or singleton as appropriate - **Pipeline pattern** — Rule executor and dispatcher use composable step-based pipelines - **Saved Objects** — Rules stored as hidden saved objects; notification policies stored as encrypted saved objects (for API key protection) - **Feature privileges** — Dedicated Kibana feature with read/all privileges for RBAC --- ## Contained PRs <details> <summary><strong>Core Engine & Plugin Init</strong> (12 PRs)</summary> - elastic#247283 — Init alerting v2 plugin (@cnasikas) - elastic#247452 — Add the alerting v2 feature privileges (@cnasikas) - elastic#247673 — Director (@cnasikas) - elastic#248306 — Create basic services (@cnasikas) - elastic#248696 — Initialize all resources (@cnasikas) - elastic#250023 — Schema package (@cnasikas) - elastic#250010 — YML Editor (@cnasikas) - elastic#251064 — Remove index.mode: lookup for RnA alert indices (@cnasikas) - elastic#251707 — Simplify task registration pattern (@kdelemme) - elastic#251876 — Dedicated user service (@cnasikas) - elastic#252073 — Use `kbn/data-streams` in alerting_v2 (@cnasikas) - elastic#255120 — Update alerting-v2 owner to new rna project team (@cnasikas) </details> <details> <summary><strong>Rule Execution Pipeline</strong> (12 PRs)</summary> - elastic#247472 — Add alerting v2 Rule Executor (@darnautov) - elastic#248285 — Alerting v2 rule HTTP APIs (@darnautov) - elastic#248728 — Add basic alert actions route (@darnautov) - elastic#250161 — Refactor rule executor to use a pipeline pattern (@darnautov) - elastic#252292 — Implement the CountTimeframeStrategy for the director (@cnasikas) - elastic#252544 — Add support of streaming in the rule executor (@darnautov) - elastic#252754 — Update rule attributes (@kdelemme) - elastic#253355 — Add getRules client method (@kdelemme) - elastic#253668 — Make evaluation.query.condition optional (@kdelemme) - elastic#254031 — Add recovery event generation to rule execution pipeline (@kdelemme) - elastic#255968 — ES&elastic#124;QL views (@adcoelho) - elastic#256697 — Create episodes ES&elastic#124;QL view (@adcoelho) </details> <details> <summary><strong>Alert Suppression & Episodes</strong> (3 PRs)</summary> - elastic#252174 — Alert suppression (@kdelemme) - elastic#256486 — Fix suppression query (@kdelemme) - elastic#256527 — Store 'unmatched' action for unmatched alert episodes (@kdelemme) </details> <details> <summary><strong>Dispatcher & Notification Engine</strong> (6 PRs)</summary> - elastic#250822 — Alerting v2 dispatcher (@kdelemme) - elastic#251529 — Use query service in dispatcher (@kdelemme) - elastic#251679 — Dispatcher task (@kdelemme) - elastic#252758 — Dispatcher notification policy (@kdelemme) - elastic#255332 — Wait for resources before scheduling dispatcher task (@kdelemme) - elastic#256536 — Use stored encrypted API keys from Notification Policy in dispatcher step (@kdelemme) </details> <details> <summary><strong>Notification Policies (Server)</strong> (4 PRs)</summary> - elastic#251336 — Introduce notification policy CRUD APIs and client (@cnasikas) - elastic#253134 — Update notification policy (@cnasikas) - elastic#254808 — Store API key owner on Notification Policy (@kdelemme) - elastic#256940 — Make notification policies global with optional rule-label scoping (@kdelemme) </details> <details> <summary><strong>Notification Policies UI</strong> (1 PR)</summary> - elastic#255599 — Add notification policies UI and Storybook form story (@adcoelho) </details> <details> <summary><strong>Rule Authoring UI</strong> (13 PRs)</summary> - elastic#250961 — Add create rule flyout in Discover (@adcoelho) - elastic#255111 — Add activation configuration fields to alerting V2 rule form (@yiannisnikolopoulos) - elastic#255427 — Rule form: provide services via context (@dominiqueclarke) - elastic#255876 — MVP rule form, Split evaluation condition, and Recovery configuration (@dominiqueclarke) - elastic#256260 — Foundational rule list (@dominiqueclarke) - elastic#256756 — Wire up edit flow (@dominiqueclarke) - elastic#256801 — Move consecutive breaches max to shared constants (@yiannisnikolopoulos) - elastic#256818 — Preview query and design parity (@dominiqueclarke) - elastic#256938 — Allow clearing number inputs in state transition fields (@yiannisnikolopoulos) - elastic#257017 — Add enable/disable and clone rule to rule list (@dominiqueclarke) - elastic#257246 — Remove all React.FC (@dominiqueclarke) - elastic#257415 — Rule form - fix test (@dominiqueclarke) - elastic#257454 — Block comma key in number input component (@yiannisnikolopoulos) </details> <details> <summary><strong>API Documentation & Schema</strong> (2 PRs)</summary> - elastic#254901 — Rename indexes for alert events and actions (@adcoelho) - elastic#255810 — OAS for alert action routes (@adcoelho) </details> <details> <summary><strong>Observability & Monitoring</strong> (3 PRs)</summary> - elastic#254925 — Add ApmMiddleware to the rule executor (@adcoelho) - elastic#255115 — Add the withAPM decorator and apply it to the rules_client (@adcoelho) - elastic#255999 — Fix linting problem in apm middleware (@adcoelho) </details> <details> <summary><strong>CI & Maintenance</strong> (2 PRs)</summary> - elastic#257409 — Refactor SO services to use inversify DI for client initialization (@darnautov) - Fix alerting-v2-schema jest config (@darnautov) </details> --- --------- Co-authored-by: Dima Arnautov <dmitrii.arnautov@elastic.co> Co-authored-by: kibanamachine <42973632+kibanamachine@users.noreply.github.com> Co-authored-by: Kevin Delemme <kevin.delemme@elastic.co> Co-authored-by: Mike Côté <mikecote@users.noreply.github.com> Co-authored-by: Antonio <antonio.coelho@elastic.co> Co-authored-by: Kevin Delemme <kdelemme@gmail.com> Co-authored-by: Dominique Clarke <dominique.clarke@elastic.co> Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> Co-authored-by: Jason Rhodes <jason.rhodes@elastic.co> Co-authored-by: Yiannis Nikolopoulos <yiannis.nikolopoulos@elastic.co> Co-authored-by: Alexi Doak <109488926+doakalexi@users.noreply.github.com> Co-authored-by: Claude Opus 4.6 <noreply@anthropic.com> Co-authored-by: Bailey Cash <bailey.cash@elastic.co> Co-authored-by: Anna Davydova <ana.davydova@elastic.co> Co-authored-by: Umberto Pepato <umbopepato@users.noreply.github.com> Co-authored-by: Jason Rhodes <jason.matthew.rhodes@gmail.com> Co-authored-by: Joana Cardoso <169058851+joana-cps@users.noreply.github.com>
- **ES|QL-native rule evaluation** — Rules are defined as ES|QL queries with optional WHERE clause conditions, evaluated on a configurable schedule - **Alert lifecycle management** — Full episode tracking with pending → active → recovering → inactive state transitions, including configurable alert delay (consecutive breaches / duration) - **Event-driven architecture** — Alert events and actions are stored in dedicated data streams (`.alerting-events`, `.alerting-actions`) with ES|QL views for querying - **Notification dispatch pipeline** — A multi-step dispatcher that matches alert episodes to notification policies, handles throttling/suppression, and triggers Kibana Workflows using encrypted API keys - **Notification policies** — CRUD APIs and UI for creating notification policies with KQL-based rule matching, workflow integration, and API key management - **Rule authoring UI** — A shared rule form package (`@kbn/alerting-v2-rule-form`) usable standalone or embedded in Discover, with ES|QL editor, WHERE clause condition editing, recovery configuration, and live query preview - **Rule management UI** — Full rule list with pagination, enable/disable, clone, edit, and delete operations - **APM instrumentation** — Middleware and decorators for tracing rule execution and client operations - **InversifyJS DI** — All services use constructor injection with typed tokens, scoped per-request or singleton as appropriate - **Pipeline pattern** — Rule executor and dispatcher use composable step-based pipelines - **Saved Objects** — Rules stored as hidden saved objects; notification policies stored as encrypted saved objects (for API key protection) - **Feature privileges** — Dedicated Kibana feature with read/all privileges for RBAC --- <details> <summary><strong>Core Engine & Plugin Init</strong> (12 PRs)</summary> - elastic#247283 — Init alerting v2 plugin (@cnasikas) - elastic#247452 — Add the alerting v2 feature privileges (@cnasikas) - elastic#247673 — Director (@cnasikas) - elastic#248306 — Create basic services (@cnasikas) - elastic#248696 — Initialize all resources (@cnasikas) - elastic#250023 — Schema package (@cnasikas) - elastic#250010 — YML Editor (@cnasikas) - elastic#251064 — Remove index.mode: lookup for RnA alert indices (@cnasikas) - elastic#251707 — Simplify task registration pattern (@kdelemme) - elastic#251876 — Dedicated user service (@cnasikas) - elastic#252073 — Use `kbn/data-streams` in alerting_v2 (@cnasikas) - elastic#255120 — Update alerting-v2 owner to new rna project team (@cnasikas) </details> <details> <summary><strong>Rule Execution Pipeline</strong> (12 PRs)</summary> - elastic#247472 — Add alerting v2 Rule Executor (@darnautov) - elastic#248285 — Alerting v2 rule HTTP APIs (@darnautov) - elastic#248728 — Add basic alert actions route (@darnautov) - elastic#250161 — Refactor rule executor to use a pipeline pattern (@darnautov) - elastic#252292 — Implement the CountTimeframeStrategy for the director (@cnasikas) - elastic#252544 — Add support of streaming in the rule executor (@darnautov) - elastic#252754 — Update rule attributes (@kdelemme) - elastic#253355 — Add getRules client method (@kdelemme) - elastic#253668 — Make evaluation.query.condition optional (@kdelemme) - elastic#254031 — Add recovery event generation to rule execution pipeline (@kdelemme) - elastic#255968 — ES&elastic#124;QL views (@adcoelho) - elastic#256697 — Create episodes ES&elastic#124;QL view (@adcoelho) </details> <details> <summary><strong>Alert Suppression & Episodes</strong> (3 PRs)</summary> - elastic#252174 — Alert suppression (@kdelemme) - elastic#256486 — Fix suppression query (@kdelemme) - elastic#256527 — Store 'unmatched' action for unmatched alert episodes (@kdelemme) </details> <details> <summary><strong>Dispatcher & Notification Engine</strong> (6 PRs)</summary> - elastic#250822 — Alerting v2 dispatcher (@kdelemme) - elastic#251529 — Use query service in dispatcher (@kdelemme) - elastic#251679 — Dispatcher task (@kdelemme) - elastic#252758 — Dispatcher notification policy (@kdelemme) - elastic#255332 — Wait for resources before scheduling dispatcher task (@kdelemme) - elastic#256536 — Use stored encrypted API keys from Notification Policy in dispatcher step (@kdelemme) </details> <details> <summary><strong>Notification Policies (Server)</strong> (4 PRs)</summary> - elastic#251336 — Introduce notification policy CRUD APIs and client (@cnasikas) - elastic#253134 — Update notification policy (@cnasikas) - elastic#254808 — Store API key owner on Notification Policy (@kdelemme) - elastic#256940 — Make notification policies global with optional rule-label scoping (@kdelemme) </details> <details> <summary><strong>Notification Policies UI</strong> (1 PR)</summary> - elastic#255599 — Add notification policies UI and Storybook form story (@adcoelho) </details> <details> <summary><strong>Rule Authoring UI</strong> (13 PRs)</summary> - elastic#250961 — Add create rule flyout in Discover (@adcoelho) - elastic#255111 — Add activation configuration fields to alerting V2 rule form (@yiannisnikolopoulos) - elastic#255427 — Rule form: provide services via context (@dominiqueclarke) - elastic#255876 — MVP rule form, Split evaluation condition, and Recovery configuration (@dominiqueclarke) - elastic#256260 — Foundational rule list (@dominiqueclarke) - elastic#256756 — Wire up edit flow (@dominiqueclarke) - elastic#256801 — Move consecutive breaches max to shared constants (@yiannisnikolopoulos) - elastic#256818 — Preview query and design parity (@dominiqueclarke) - elastic#256938 — Allow clearing number inputs in state transition fields (@yiannisnikolopoulos) - elastic#257017 — Add enable/disable and clone rule to rule list (@dominiqueclarke) - elastic#257246 — Remove all React.FC (@dominiqueclarke) - elastic#257415 — Rule form - fix test (@dominiqueclarke) - elastic#257454 — Block comma key in number input component (@yiannisnikolopoulos) </details> <details> <summary><strong>API Documentation & Schema</strong> (2 PRs)</summary> - elastic#254901 — Rename indexes for alert events and actions (@adcoelho) - elastic#255810 — OAS for alert action routes (@adcoelho) </details> <details> <summary><strong>Observability & Monitoring</strong> (3 PRs)</summary> - elastic#254925 — Add ApmMiddleware to the rule executor (@adcoelho) - elastic#255115 — Add the withAPM decorator and apply it to the rules_client (@adcoelho) - elastic#255999 — Fix linting problem in apm middleware (@adcoelho) </details> <details> <summary><strong>CI & Maintenance</strong> (2 PRs)</summary> - elastic#257409 — Refactor SO services to use inversify DI for client initialization (@darnautov) - Fix alerting-v2-schema jest config (@darnautov) </details> --- --------- Co-authored-by: Dima Arnautov <dmitrii.arnautov@elastic.co> Co-authored-by: kibanamachine <42973632+kibanamachine@users.noreply.github.com> Co-authored-by: Kevin Delemme <kevin.delemme@elastic.co> Co-authored-by: Mike Côté <mikecote@users.noreply.github.com> Co-authored-by: Antonio <antonio.coelho@elastic.co> Co-authored-by: Kevin Delemme <kdelemme@gmail.com> Co-authored-by: Dominique Clarke <dominique.clarke@elastic.co> Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> Co-authored-by: Jason Rhodes <jason.rhodes@elastic.co> Co-authored-by: Yiannis Nikolopoulos <yiannis.nikolopoulos@elastic.co> Co-authored-by: Alexi Doak <109488926+doakalexi@users.noreply.github.com> Co-authored-by: Claude Opus 4.6 <noreply@anthropic.com> Co-authored-by: Bailey Cash <bailey.cash@elastic.co> Co-authored-by: Anna Davydova <ana.davydova@elastic.co> Co-authored-by: Umberto Pepato <umbopepato@users.noreply.github.com> Co-authored-by: Jason Rhodes <jason.matthew.rhodes@gmail.com> Co-authored-by: Joana Cardoso <169058851+joana-cps@users.noreply.github.com>
- **ES|QL-native rule evaluation** — Rules are defined as ES|QL queries with optional WHERE clause conditions, evaluated on a configurable schedule - **Alert lifecycle management** — Full episode tracking with pending → active → recovering → inactive state transitions, including configurable alert delay (consecutive breaches / duration) - **Event-driven architecture** — Alert events and actions are stored in dedicated data streams (`.alerting-events`, `.alerting-actions`) with ES|QL views for querying - **Notification dispatch pipeline** — A multi-step dispatcher that matches alert episodes to notification policies, handles throttling/suppression, and triggers Kibana Workflows using encrypted API keys - **Notification policies** — CRUD APIs and UI for creating notification policies with KQL-based rule matching, workflow integration, and API key management - **Rule authoring UI** — A shared rule form package (`@kbn/alerting-v2-rule-form`) usable standalone or embedded in Discover, with ES|QL editor, WHERE clause condition editing, recovery configuration, and live query preview - **Rule management UI** — Full rule list with pagination, enable/disable, clone, edit, and delete operations - **APM instrumentation** — Middleware and decorators for tracing rule execution and client operations - **InversifyJS DI** — All services use constructor injection with typed tokens, scoped per-request or singleton as appropriate - **Pipeline pattern** — Rule executor and dispatcher use composable step-based pipelines - **Saved Objects** — Rules stored as hidden saved objects; notification policies stored as encrypted saved objects (for API key protection) - **Feature privileges** — Dedicated Kibana feature with read/all privileges for RBAC --- <details> <summary><strong>Core Engine & Plugin Init</strong> (12 PRs)</summary> - elastic#247283 — Init alerting v2 plugin (@cnasikas) - elastic#247452 — Add the alerting v2 feature privileges (@cnasikas) - elastic#247673 — Director (@cnasikas) - elastic#248306 — Create basic services (@cnasikas) - elastic#248696 — Initialize all resources (@cnasikas) - elastic#250023 — Schema package (@cnasikas) - elastic#250010 — YML Editor (@cnasikas) - elastic#251064 — Remove index.mode: lookup for RnA alert indices (@cnasikas) - elastic#251707 — Simplify task registration pattern (@kdelemme) - elastic#251876 — Dedicated user service (@cnasikas) - elastic#252073 — Use `kbn/data-streams` in alerting_v2 (@cnasikas) - elastic#255120 — Update alerting-v2 owner to new rna project team (@cnasikas) </details> <details> <summary><strong>Rule Execution Pipeline</strong> (12 PRs)</summary> - elastic#247472 — Add alerting v2 Rule Executor (@darnautov) - elastic#248285 — Alerting v2 rule HTTP APIs (@darnautov) - elastic#248728 — Add basic alert actions route (@darnautov) - elastic#250161 — Refactor rule executor to use a pipeline pattern (@darnautov) - elastic#252292 — Implement the CountTimeframeStrategy for the director (@cnasikas) - elastic#252544 — Add support of streaming in the rule executor (@darnautov) - elastic#252754 — Update rule attributes (@kdelemme) - elastic#253355 — Add getRules client method (@kdelemme) - elastic#253668 — Make evaluation.query.condition optional (@kdelemme) - elastic#254031 — Add recovery event generation to rule execution pipeline (@kdelemme) - elastic#255968 — ES&elastic#124;QL views (@adcoelho) - elastic#256697 — Create episodes ES&elastic#124;QL view (@adcoelho) </details> <details> <summary><strong>Alert Suppression & Episodes</strong> (3 PRs)</summary> - elastic#252174 — Alert suppression (@kdelemme) - elastic#256486 — Fix suppression query (@kdelemme) - elastic#256527 — Store 'unmatched' action for unmatched alert episodes (@kdelemme) </details> <details> <summary><strong>Dispatcher & Notification Engine</strong> (6 PRs)</summary> - elastic#250822 — Alerting v2 dispatcher (@kdelemme) - elastic#251529 — Use query service in dispatcher (@kdelemme) - elastic#251679 — Dispatcher task (@kdelemme) - elastic#252758 — Dispatcher notification policy (@kdelemme) - elastic#255332 — Wait for resources before scheduling dispatcher task (@kdelemme) - elastic#256536 — Use stored encrypted API keys from Notification Policy in dispatcher step (@kdelemme) </details> <details> <summary><strong>Notification Policies (Server)</strong> (4 PRs)</summary> - elastic#251336 — Introduce notification policy CRUD APIs and client (@cnasikas) - elastic#253134 — Update notification policy (@cnasikas) - elastic#254808 — Store API key owner on Notification Policy (@kdelemme) - elastic#256940 — Make notification policies global with optional rule-label scoping (@kdelemme) </details> <details> <summary><strong>Notification Policies UI</strong> (1 PR)</summary> - elastic#255599 — Add notification policies UI and Storybook form story (@adcoelho) </details> <details> <summary><strong>Rule Authoring UI</strong> (13 PRs)</summary> - elastic#250961 — Add create rule flyout in Discover (@adcoelho) - elastic#255111 — Add activation configuration fields to alerting V2 rule form (@yiannisnikolopoulos) - elastic#255427 — Rule form: provide services via context (@dominiqueclarke) - elastic#255876 — MVP rule form, Split evaluation condition, and Recovery configuration (@dominiqueclarke) - elastic#256260 — Foundational rule list (@dominiqueclarke) - elastic#256756 — Wire up edit flow (@dominiqueclarke) - elastic#256801 — Move consecutive breaches max to shared constants (@yiannisnikolopoulos) - elastic#256818 — Preview query and design parity (@dominiqueclarke) - elastic#256938 — Allow clearing number inputs in state transition fields (@yiannisnikolopoulos) - elastic#257017 — Add enable/disable and clone rule to rule list (@dominiqueclarke) - elastic#257246 — Remove all React.FC (@dominiqueclarke) - elastic#257415 — Rule form - fix test (@dominiqueclarke) - elastic#257454 — Block comma key in number input component (@yiannisnikolopoulos) </details> <details> <summary><strong>API Documentation & Schema</strong> (2 PRs)</summary> - elastic#254901 — Rename indexes for alert events and actions (@adcoelho) - elastic#255810 — OAS for alert action routes (@adcoelho) </details> <details> <summary><strong>Observability & Monitoring</strong> (3 PRs)</summary> - elastic#254925 — Add ApmMiddleware to the rule executor (@adcoelho) - elastic#255115 — Add the withAPM decorator and apply it to the rules_client (@adcoelho) - elastic#255999 — Fix linting problem in apm middleware (@adcoelho) </details> <details> <summary><strong>CI & Maintenance</strong> (2 PRs)</summary> - elastic#257409 — Refactor SO services to use inversify DI for client initialization (@darnautov) - Fix alerting-v2-schema jest config (@darnautov) </details> --- --------- Co-authored-by: Dima Arnautov <dmitrii.arnautov@elastic.co> Co-authored-by: kibanamachine <42973632+kibanamachine@users.noreply.github.com> Co-authored-by: Kevin Delemme <kevin.delemme@elastic.co> Co-authored-by: Mike Côté <mikecote@users.noreply.github.com> Co-authored-by: Antonio <antonio.coelho@elastic.co> Co-authored-by: Kevin Delemme <kdelemme@gmail.com> Co-authored-by: Dominique Clarke <dominique.clarke@elastic.co> Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> Co-authored-by: Jason Rhodes <jason.rhodes@elastic.co> Co-authored-by: Yiannis Nikolopoulos <yiannis.nikolopoulos@elastic.co> Co-authored-by: Alexi Doak <109488926+doakalexi@users.noreply.github.com> Co-authored-by: Claude Opus 4.6 <noreply@anthropic.com> Co-authored-by: Bailey Cash <bailey.cash@elastic.co> Co-authored-by: Anna Davydova <ana.davydova@elastic.co> Co-authored-by: Umberto Pepato <umbopepato@users.noreply.github.com> Co-authored-by: Jason Rhodes <jason.matthew.rhodes@gmail.com> Co-authored-by: Joana Cardoso <169058851+joana-cps@users.noreply.github.com>
## Summary ### Key capabilities - **ES|QL-native rule evaluation** — Rules are defined as ES|QL queries with optional WHERE clause conditions, evaluated on a configurable schedule - **Alert lifecycle management** — Full episode tracking with pending → active → recovering → inactive state transitions, including configurable alert delay (consecutive breaches / duration) - **Event-driven architecture** — Alert events and actions are stored in dedicated data streams (`.alerting-events`, `.alerting-actions`) with ES|QL views for querying - **Notification dispatch pipeline** — A multi-step dispatcher that matches alert episodes to notification policies, handles throttling/suppression, and triggers Kibana Workflows using encrypted API keys - **Notification policies** — CRUD APIs and UI for creating notification policies with KQL-based rule matching, workflow integration, and API key management - **Rule authoring UI** — A shared rule form package (`@kbn/alerting-v2-rule-form`) usable standalone or embedded in Discover, with ES|QL editor, WHERE clause condition editing, recovery configuration, and live query preview - **Rule management UI** — Full rule list with pagination, enable/disable, clone, edit, and delete operations - **APM instrumentation** — Middleware and decorators for tracing rule execution and client operations ### Architecture highlights - **InversifyJS DI** — All services use constructor injection with typed tokens, scoped per-request or singleton as appropriate - **Pipeline pattern** — Rule executor and dispatcher use composable step-based pipelines - **Saved Objects** — Rules stored as hidden saved objects; notification policies stored as encrypted saved objects (for API key protection) - **Feature privileges** — Dedicated Kibana feature with read/all privileges for RBAC --- ## Contained PRs <details> <summary><strong>Core Engine & Plugin Init</strong> (12 PRs)</summary> - elastic#247283 — Init alerting v2 plugin (@cnasikas) - elastic#247452 — Add the alerting v2 feature privileges (@cnasikas) - elastic#247673 — Director (@cnasikas) - elastic#248306 — Create basic services (@cnasikas) - elastic#248696 — Initialize all resources (@cnasikas) - elastic#250023 — Schema package (@cnasikas) - elastic#250010 — YML Editor (@cnasikas) - elastic#251064 — Remove index.mode: lookup for RnA alert indices (@cnasikas) - elastic#251707 — Simplify task registration pattern (@kdelemme) - elastic#251876 — Dedicated user service (@cnasikas) - elastic#252073 — Use `kbn/data-streams` in alerting_v2 (@cnasikas) - elastic#255120 — Update alerting-v2 owner to new rna project team (@cnasikas) </details> <details> <summary><strong>Rule Execution Pipeline</strong> (12 PRs)</summary> - elastic#247472 — Add alerting v2 Rule Executor (@darnautov) - elastic#248285 — Alerting v2 rule HTTP APIs (@darnautov) - elastic#248728 — Add basic alert actions route (@darnautov) - elastic#250161 — Refactor rule executor to use a pipeline pattern (@darnautov) - elastic#252292 — Implement the CountTimeframeStrategy for the director (@cnasikas) - elastic#252544 — Add support of streaming in the rule executor (@darnautov) - elastic#252754 — Update rule attributes (@kdelemme) - elastic#253355 — Add getRules client method (@kdelemme) - elastic#253668 — Make evaluation.query.condition optional (@kdelemme) - elastic#254031 — Add recovery event generation to rule execution pipeline (@kdelemme) - elastic#255968 — ES&elastic#124;QL views (@adcoelho) - elastic#256697 — Create episodes ES&elastic#124;QL view (@adcoelho) </details> <details> <summary><strong>Alert Suppression & Episodes</strong> (3 PRs)</summary> - elastic#252174 — Alert suppression (@kdelemme) - elastic#256486 — Fix suppression query (@kdelemme) - elastic#256527 — Store 'unmatched' action for unmatched alert episodes (@kdelemme) </details> <details> <summary><strong>Dispatcher & Notification Engine</strong> (6 PRs)</summary> - elastic#250822 — Alerting v2 dispatcher (@kdelemme) - elastic#251529 — Use query service in dispatcher (@kdelemme) - elastic#251679 — Dispatcher task (@kdelemme) - elastic#252758 — Dispatcher notification policy (@kdelemme) - elastic#255332 — Wait for resources before scheduling dispatcher task (@kdelemme) - elastic#256536 — Use stored encrypted API keys from Notification Policy in dispatcher step (@kdelemme) </details> <details> <summary><strong>Notification Policies (Server)</strong> (4 PRs)</summary> - elastic#251336 — Introduce notification policy CRUD APIs and client (@cnasikas) - elastic#253134 — Update notification policy (@cnasikas) - elastic#254808 — Store API key owner on Notification Policy (@kdelemme) - elastic#256940 — Make notification policies global with optional rule-label scoping (@kdelemme) </details> <details> <summary><strong>Notification Policies UI</strong> (1 PR)</summary> - elastic#255599 — Add notification policies UI and Storybook form story (@adcoelho) </details> <details> <summary><strong>Rule Authoring UI</strong> (13 PRs)</summary> - elastic#250961 — Add create rule flyout in Discover (@adcoelho) - elastic#255111 — Add activation configuration fields to alerting V2 rule form (@yiannisnikolopoulos) - elastic#255427 — Rule form: provide services via context (@dominiqueclarke) - elastic#255876 — MVP rule form, Split evaluation condition, and Recovery configuration (@dominiqueclarke) - elastic#256260 — Foundational rule list (@dominiqueclarke) - elastic#256756 — Wire up edit flow (@dominiqueclarke) - elastic#256801 — Move consecutive breaches max to shared constants (@yiannisnikolopoulos) - elastic#256818 — Preview query and design parity (@dominiqueclarke) - elastic#256938 — Allow clearing number inputs in state transition fields (@yiannisnikolopoulos) - elastic#257017 — Add enable/disable and clone rule to rule list (@dominiqueclarke) - elastic#257246 — Remove all React.FC (@dominiqueclarke) - elastic#257415 — Rule form - fix test (@dominiqueclarke) - elastic#257454 — Block comma key in number input component (@yiannisnikolopoulos) </details> <details> <summary><strong>API Documentation & Schema</strong> (2 PRs)</summary> - elastic#254901 — Rename indexes for alert events and actions (@adcoelho) - elastic#255810 — OAS for alert action routes (@adcoelho) </details> <details> <summary><strong>Observability & Monitoring</strong> (3 PRs)</summary> - elastic#254925 — Add ApmMiddleware to the rule executor (@adcoelho) - elastic#255115 — Add the withAPM decorator and apply it to the rules_client (@adcoelho) - elastic#255999 — Fix linting problem in apm middleware (@adcoelho) </details> <details> <summary><strong>CI & Maintenance</strong> (2 PRs)</summary> - elastic#257409 — Refactor SO services to use inversify DI for client initialization (@darnautov) - Fix alerting-v2-schema jest config (@darnautov) </details> --- --------- Co-authored-by: Dima Arnautov <dmitrii.arnautov@elastic.co> Co-authored-by: kibanamachine <42973632+kibanamachine@users.noreply.github.com> Co-authored-by: Kevin Delemme <kevin.delemme@elastic.co> Co-authored-by: Mike Côté <mikecote@users.noreply.github.com> Co-authored-by: Antonio <antonio.coelho@elastic.co> Co-authored-by: Kevin Delemme <kdelemme@gmail.com> Co-authored-by: Dominique Clarke <dominique.clarke@elastic.co> Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> Co-authored-by: Jason Rhodes <jason.rhodes@elastic.co> Co-authored-by: Yiannis Nikolopoulos <yiannis.nikolopoulos@elastic.co> Co-authored-by: Alexi Doak <109488926+doakalexi@users.noreply.github.com> Co-authored-by: Claude Opus 4.6 <noreply@anthropic.com> Co-authored-by: Bailey Cash <bailey.cash@elastic.co> Co-authored-by: Anna Davydova <ana.davydova@elastic.co> Co-authored-by: Umberto Pepato <umbopepato@users.noreply.github.com> Co-authored-by: Jason Rhodes <jason.matthew.rhodes@gmail.com> Co-authored-by: Joana Cardoso <169058851+joana-cps@users.noreply.github.com>
Summary
Note
Dear reviewers. This PR is getting merged into a feature branch. Only the ResponseOps review is needed it at the moment. We will request for your review when we open the feature branch PR to be merged on
main.Why Async Generators?
While classic streams are powerful for raw binary I/O, our use case involves structured data (Apache Arrow) and Object-based post-processing (JSON). I, and discussing with various LLMs a lot, decided to use Async Generators for the following reasons:
Per object processing
Classic streams often require manual configuration of
objectMode: trueandhighWaterMark. Async Generators treat every yield as a discrete object, making them naturally suited for processing a batch of JSON documents without the overhead of buffer management. We need to wait for ES to at least emit one full object before starting processing.Push vs. pull and declarative backpressure
The fundamental difference between the "push" and "pull" models lies in who controls the pace of data. In a classic push model (standard Streams), the source emits data as fast as it can, and each stage must explicitly tell the previous one to "stop" (via
pause()or the drain event) if it gets overwhelmed.In contrast, async generators use a pull model, where the consumer (the final destination) asks the previous step for a chunk of data only when it has the capacity to process it. This request propagates back up the chain to the source. Because the code literally awaits the next chunk, backpressure happens automatically. This makes the system more stable, as a producer can't outrun a consumer.
Batch processing per generator
Each generator can define its own pace of processing data before yielding to the next generator. The generator can use a temporal buffer, accumulating individual objects into a larger collection (a "batch") before yielding them. This is useful if a step in the chain needs to be processed at a slower or faster pace than the rest of the steps. For example, the director cannot do expensive queries that rely on the number of unique episode IDs. Also, the storage service may want to collect more records before doing a write call to ES to avoid overwhelming ES with a lot of small writes.
Improved error handling & cleanup
By using stream/promises.pipeline, we ensure that if a post-processing step fails or an Elasticsearch write times out, all underlying streams are automatically destroyed and memory is cleared immediately. The error is propagated via a standard Promise catch block, rather than
on('error')listeners make it easier to reason and debug errors.Execution Context & Cancellation
Rule execution is a multi-step streaming pipeline where each step (query, alert event creation, director, storage) processes data incrementally through async generators. Because execution can be cancelled at any time (e.g., due to a timeout), we need a structured way to propagate cancellation across all steps. The
ExecutionContextwraps a standardAbortSignaland provides three capabilities:throwIfAborted()for cancellation checks at execution points (loop boundaries, before/after I/O),onAbort()for reactive cleanup when the signal fires, andcreateScope()for grouping disposable resources (open streams, ES|QL readers, in-memory state) that must be released on cancellation.CancellationScopeensures all registered disposers run even if one throws, and theCancellationBoundaryMiddlewareenforces cancellation checks at every step boundary in the pipeline so that individual steps don't need to duplicate that logic. Together, these pieces ensure that a cancelled execution tears down cleanly — ES connections are closed, intermediate state is freed, and no unhandled rejections are left behind.Checklist
Check the PR satisfies following conditions.
Reviewers should verify this PR satisfies this list as well.