Streams rally track: Set up streams through Kibana#254747
Conversation
|
Pinging @elastic/obs-onboarding-team (Team:obs-onboarding) |
flash1293
left a comment
There was a problem hiding this comment.
Looks pretty good @nikita-lavrov , thanks! Left some comments, but it's close.
|
|
||
| export const configSchema = schema.object({}); | ||
| export const configSchema = schema.object({ | ||
| preconfigured: schema.object({ |
There was a problem hiding this comment.
Not sure whether this exists, but can we somehow mark this as tech preview?
| export const configSchema = schema.object({ | ||
| preconfigured: schema.object({ | ||
| enabled: schema.boolean({ defaultValue: false }), | ||
| forks: schema.arrayOf( |
There was a problem hiding this comment.
The thing with forks vs streams feels kinda weird to me, can we somehow keep this closer to the regular shape of streams configurations?
|
|
||
| // Use security API to check if user has all required permissions | ||
| const securityClient = this.dependencies.scopedClusterClient.asCurrentUser.security; | ||
| const securityClient = this.dependencies.currentUser.security; |
There was a problem hiding this comment.
It's a bit verbose in the diff that you had to change this everywhere, but I actually like it! This makes it much harder to accidentally use the wrong client.
Only nit - currentUser doesn't make it clear this is about the es client, that should be somehow in the name
| // Since the RulesClient cannot be unscoped, we provide an empty object that will | ||
| // throw an error if dashboards, rules, or queries exist in the stream definition. | ||
| // This is a limitation of the config-based streams for now. | ||
| const rulesClient = {} as RulesClient; |
There was a problem hiding this comment.
Could you mock this more explicitly with method stubs that throw a descriptive error? Otherwise things fail with a "can't call undefined" error that doesn't tell us much
|
|
||
| await streamsClient.enableStreams(); | ||
|
|
||
| for (const { name, parent, where, status } of this.config.preconfigured.forks) { |
There was a problem hiding this comment.
It's OK if the config schema itself doesn't catch this (since we don't want to duplicate the streams schema in kbn/config), but in here we should use the schema parsing functions to make sure the data coming in has the right shape and fail with the standard zod error if that's not the case.
|
|
||
| for (const { name, parent, where, status } of this.config.preconfigured.forks) { | ||
| if (!(await streamsClient.existsStream(name))) { | ||
| await streamsClient.forkStream({ |
There was a problem hiding this comment.
As mentioned in the config schema file, if the config is just a list of streams definition, this is simplified and you can just pass the config to bulkUpsert and be done
flash1293
left a comment
There was a problem hiding this comment.
This is really good, works super well! I have one comment in the code and one idea what we could improve - right now you need to repeat the full definition for the root streams because otherwise it will fail:
wired:
fields:
'[@timestamp]':
type: date
'[stream.name]':
type: system
'[scope.name]':
type: keyword
ignore_above: 1024
'[host.name]':
type: keyword
ignore_above: 1024
'[trace.id]':
type: keyword
ignore_above: 1024
'[span.id]':
type: keyword
ignore_above: 1024
'[service.name]':
type: keyword
ignore_above: 1024
message:
type: match_only_text
'[log.level]':
type: keyword
ignore_above: 1024
routing:
This is verbose, but the bigger problem is that once we use that in the benchmark, it will start failing once we e.g. add a field here. Could you change the logic to spread in the defaults for the root streams in particular so you can omit them in the config file and focus on the stuff you actually want to add? This will make our lives easier down the line
| } | ||
|
|
||
| if (this.config.preconfigured.enabled) { | ||
| void core.getStartServices().then(async ([coreStart]) => { |
There was a problem hiding this comment.
What is this void here doing? Is this is suppress the warning about unhandled promises? If yes, we should probably do a little explanation why it's OK in this spot. Also, let's catch exceptions in this promise chain and at least log them so there is some indication of what happened.
|
/ci |
…rack-set-up-streams-through-kibana
flash1293
left a comment
There was a problem hiding this comment.
Just some nits, then it should be good to go
| request: Streams.all.UpsertRequest.parse(definition), | ||
| }; | ||
| }) | ||
| ); |
There was a problem hiding this comment.
Could we add a success log as well? Otherwise it's a bit magic
| esClientAsInternalUser: esClient, | ||
| }); | ||
|
|
||
| await streamsClient.enableStreams(); |
There was a problem hiding this comment.
We discussed that a similar use case in the fleet plugin has retry logic in place because it's possible Elasticsearch isn't fully ready yet in automated scenarios. Could we add that?
| await streamsClient.bulkUpsert( | ||
| this.config.preconfigured.stream_definitions.map(({ name, ...definition }) => { | ||
| if (ROOT_STREAM_NAMES.includes(name)) { | ||
| definition.stream.ingest.wired.fields = |
There was a problem hiding this comment.
nit: This is mutating the root config object, which is something we shouldn't do. Either spread all the layers or do a deep clone. Otherwise another consumer reading this object sees the change from here as well (or not, based on the timing, it's a bug waiting to happen)
📝 WalkthroughWalkthroughThis PR refactors the Streams plugin to eliminate KibanaRequest dependencies and replace IScopedClusterClient with ElasticsearchClient throughout. Service methods are renamed from Changes
Estimated code review effort🎯 4 (Complex) | ⏱️ ~75 minutes 🚥 Pre-merge checks | ✅ 2 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (2 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches🧪 Generate unit tests (beta)
Tip Try Coding Plans. Let us write the prompt for your AI agent so you can ship faster (with fewer bugs). Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Actionable comments posted: 1
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
x-pack/platform/plugins/shared/streams/server/lib/streams/create_streams_global_search_result_provider.ts (1)
98-105:⚠️ Potential issue | 🟠 MajorDon't bypass read checks for query-stream hits.
storageClient.search()still runs as the internal user, so this branch returns any matching query stream as soon as the UI setting is enabled, even whencheckAccessBulksays the caller cannot read it. That leaks stream titles and URLs in global search.🔐 Suggested fix
- const hitsWithAccess = searchResponse.hits.hits.filter((hit) => { - if (Streams.QueryStream.Definition.is(hit._source)) return queryStreamsEnabled; - return privileges[hit._source.name]?.read === true; - }); + const hitsWithAccess = hits.filter((hit) => { + const canRead = privileges[hit._source.name]?.read === true; + if (Streams.QueryStream.Definition.is(hit._source)) { + return queryStreamsEnabled && canRead; + } + return canRead; + });🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@x-pack/platform/plugins/shared/streams/server/lib/streams/create_streams_global_search_result_provider.ts` around lines 98 - 105, The filter for hitsWithAccess currently bypasses checkAccessBulk for query streams (Streams.QueryStream.Definition.is) when queryStreamsEnabled is true, leaking titles/URLs; update the hitsWithAccess predicate to always consult the privileges map from checkAccessBulk (privileges[name]?.read === true) and only allow query streams when both queryStreamsEnabled is true AND the privilege check passes (use the hit._source.name to look up privileges); modify the logic in the hitsWithAccess filter (the block using Streams.QueryStream.Definition.is, queryStreamsEnabled, and privileges) so no branch returns true without verifying privileges from checkAccessBulk.
♻️ Duplicate comments (1)
x-pack/platform/plugins/shared/streams/server/plugin.ts (1)
275-324:⚠️ Potential issue | 🟠 MajorValidate unsupported alerting config before using the fake
RulesClient.This startup path casts a three-method stub to
RulesClient, so unsupportedrules/queriesonly fail once the preconfiguration sync is already running. If reconciliation touches any otherRulesClientmember, bootstrap can die mid-sync with a generic runtime error. Reject those shapes beforeenableStreams/bulkUpsert, and make the fallback client throw a descriptive error for any accessed member.Run the following script to verify which
RulesClientmembers the streams code can hit and whether the preconfigured path rejects unsupported config before side effects begin:#!/bin/bash set -euo pipefail echo "RulesClient member usage under streams:" rg -n --type=ts -C2 '\brulesClient\.[A-Za-z_][A-Za-z0-9_]*\b' \ x-pack/platform/plugins/shared/streams/server/lib/streams \ x-pack/platform/plugins/shared/streams/server/plugin.ts echo echo "Preconfigured startup branch:" sed -n '266,329p' x-pack/platform/plugins/shared/streams/server/plugin.tsExpected result: only members intentionally supported by the stub are reachable, and unsupported
rules/queriesare rejected beforebulkUpsertstarts. As per coding guidelines, "avoidanyandunknown."🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@x-pack/platform/plugins/shared/streams/server/plugin.ts` around lines 275 - 324, The current stubbed RulesClient (the object cast to RulesClient) only implements bulkGetRules/create/update and is used when building attachmentService.getClient, queryService.getClient and streamsService.getClient, which risks runtime failures during streamsClient.enableStreams or streamsClient.bulkUpsert; instead, validate the preconfigured.rules/queries shapes up-front (before calling enableStreams or bulkUpsert on streamsClient) and replace the stub with a defensive proxy that throws a clear, descriptive Error for any accessed/unimplemented member; locate the RulesClient stub in plugin.ts, add a validation step that inspects this.config.preconfigured.stream_definitions (and any rules/queries entries) and rejects invalid shapes before constructing attachmentClient/featureClient/queryClient, and implement the fallback RulesClient as a Proxy or wrapper that throws "Unsupported RulesClient method: <methodName>" for any property access other than the explicitly supported methods (bulkGetRules, create, update).
🧹 Nitpick comments (2)
x-pack/platform/plugins/shared/streams/common/config.ts (1)
14-14: Prefer a typed schema overschema.any()forstream_definitions.Using
schema.any()bypasses config validation entirely, allowing malformed stream definitions to slip through until runtime. Since stream definitions have a known structure (per PR notes requiring "full definitions" for nested items), consider defining a proper schema—even a minimal one—to catch configuration errors at startup.If the shape is intentionally flexible during this initial implementation, at minimum document the expected structure inline or consider marking this configuration as experimental/tech preview.
As per coding guidelines: "Use TypeScript for all new code; avoid
anyandunknown."🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@x-pack/platform/plugins/shared/streams/common/config.ts` at line 14, Replace the untyped schema.any() used for stream_definitions with a concrete schema that enforces the known stream definition shape: change stream_definitions to schema.arrayOf(schema.object(...)) and validate required top-level properties (e.g., id/name/type as strings, an optional description, and a nested items array/object with its own minimal schema) so malformed configs fail at startup; update the symbol stream_definitions in config.ts and add inline comments documenting the expected fields and mark it experimental if the shape may change.x-pack/platform/plugins/shared/streams/server/lib/streams/state_management/execution_plan/translate_classic_stream_pipeline_actions.test.ts (1)
37-50: Add one regression case for the internal-user client path.These tests still exercise the helper through
asCurrentUser, so they don't pin the startup path this refactor adds. Switching one happy-path case toasInternalUserwould make it much harder to accidentally reintroduce a request-scoped assumption later.🧪 Minimal test tweak
- clusterClient.asCurrentUser.indices.getIndexTemplate.mockImplementationOnce(async () => ({ + clusterClient.asInternalUser.indices.getIndexTemplate.mockImplementationOnce(async () => ({ index_templates: [ { name: 'my-template', index_template: { composed_of: [], index_patterns: 'my-*', }, }, ], })); - await translateClassicStreamPipelineActions(actionsByType, clusterClient.asCurrentUser); + await translateClassicStreamPipelineActions(actionsByType, clusterClient.asInternalUser);🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@x-pack/platform/plugins/shared/streams/server/lib/streams/state_management/execution_plan/translate_classic_stream_pipeline_actions.test.ts` around lines 37 - 50, Add a regression test that exercises the internal-user code path by calling translateClassicStreamPipelineActions with clusterClient.asInternalUser instead of asCurrentUser; specifically, duplicate the existing happy-path setup (mocking clusterClient.asInternalUser.indices.getIndexTemplate to return the same index_templates payload) and invoke translateClassicStreamPipelineActions(actionsByType, clusterClient.asInternalUser) so the test covers the internal-user client path and prevents reintroducing request-scoped assumptions in translateClassicStreamPipelineActions.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@x-pack/platform/plugins/shared/streams/server/plugin.ts`:
- Around line 304-317: The current mapping of
this.config.preconfigured.stream_definitions for ROOT_STREAM_NAMES (in
plugin.ts) overwrites stream.ingest.wired.fields with baseFields/ecsBaseFields,
silently discarding any user-configured root fields; update the logic in the
mapping that calls Streams.all.UpsertRequest.parse so that for root streams
(identified by ROOT_STREAM_NAMES and LOGS_ECS_STREAM_NAME) you merge existing
definition.stream.ingest.wired.fields with the defaults
(baseFields/ecsBaseFields) instead of replacing them, or alternatively detect
and throw a validation error if the incoming definition contains custom root
fields — apply the merge/validation in the same mapping before calling
Streams.all.UpsertRequest.parse so parsing receives the corrected object.
---
Outside diff comments:
In
`@x-pack/platform/plugins/shared/streams/server/lib/streams/create_streams_global_search_result_provider.ts`:
- Around line 98-105: The filter for hitsWithAccess currently bypasses
checkAccessBulk for query streams (Streams.QueryStream.Definition.is) when
queryStreamsEnabled is true, leaking titles/URLs; update the hitsWithAccess
predicate to always consult the privileges map from checkAccessBulk
(privileges[name]?.read === true) and only allow query streams when both
queryStreamsEnabled is true AND the privilege check passes (use the
hit._source.name to look up privileges); modify the logic in the hitsWithAccess
filter (the block using Streams.QueryStream.Definition.is, queryStreamsEnabled,
and privileges) so no branch returns true without verifying privileges from
checkAccessBulk.
---
Duplicate comments:
In `@x-pack/platform/plugins/shared/streams/server/plugin.ts`:
- Around line 275-324: The current stubbed RulesClient (the object cast to
RulesClient) only implements bulkGetRules/create/update and is used when
building attachmentService.getClient, queryService.getClient and
streamsService.getClient, which risks runtime failures during
streamsClient.enableStreams or streamsClient.bulkUpsert; instead, validate the
preconfigured.rules/queries shapes up-front (before calling enableStreams or
bulkUpsert on streamsClient) and replace the stub with a defensive proxy that
throws a clear, descriptive Error for any accessed/unimplemented member; locate
the RulesClient stub in plugin.ts, add a validation step that inspects
this.config.preconfigured.stream_definitions (and any rules/queries entries) and
rejects invalid shapes before constructing
attachmentClient/featureClient/queryClient, and implement the fallback
RulesClient as a Proxy or wrapper that throws "Unsupported RulesClient method:
<methodName>" for any property access other than the explicitly supported
methods (bulkGetRules, create, update).
---
Nitpick comments:
In `@x-pack/platform/plugins/shared/streams/common/config.ts`:
- Line 14: Replace the untyped schema.any() used for stream_definitions with a
concrete schema that enforces the known stream definition shape: change
stream_definitions to schema.arrayOf(schema.object(...)) and validate required
top-level properties (e.g., id/name/type as strings, an optional description,
and a nested items array/object with its own minimal schema) so malformed
configs fail at startup; update the symbol stream_definitions in config.ts and
add inline comments documenting the expected fields and mark it experimental if
the shape may change.
In
`@x-pack/platform/plugins/shared/streams/server/lib/streams/state_management/execution_plan/translate_classic_stream_pipeline_actions.test.ts`:
- Around line 37-50: Add a regression test that exercises the internal-user code
path by calling translateClassicStreamPipelineActions with
clusterClient.asInternalUser instead of asCurrentUser; specifically, duplicate
the existing happy-path setup (mocking
clusterClient.asInternalUser.indices.getIndexTemplate to return the same
index_templates payload) and invoke
translateClassicStreamPipelineActions(actionsByType,
clusterClient.asInternalUser) so the test covers the internal-user client path
and prevents reintroducing request-scoped assumptions in
translateClassicStreamPipelineActions.
ℹ️ Review info
⚙️ Run configuration
Configuration used: Path: .coderabbit.yml
Review profile: CHILL
Plan: Pro
Run ID: cd7fb284-f089-47bc-ac4e-4e727b5c8fc1
📒 Files selected for processing (28)
x-pack/platform/plugins/shared/streams/common/config.tsx-pack/platform/plugins/shared/streams/server/lib/streams/assets/query/query_service.tsx-pack/platform/plugins/shared/streams/server/lib/streams/attachments/attachment_service.tsx-pack/platform/plugins/shared/streams/server/lib/streams/client.tsx-pack/platform/plugins/shared/streams/server/lib/streams/create_streams_global_search_result_provider.tsx-pack/platform/plugins/shared/streams/server/lib/streams/feature/feature_service.tsx-pack/platform/plugins/shared/streams/server/lib/streams/helpers/validate_fields.tsx-pack/platform/plugins/shared/streams/server/lib/streams/service.tsx-pack/platform/plugins/shared/streams/server/lib/streams/state_management/execution_plan/execution_plan.tsx-pack/platform/plugins/shared/streams/server/lib/streams/state_management/execution_plan/translate_classic_stream_pipeline_actions.test.tsx-pack/platform/plugins/shared/streams/server/lib/streams/state_management/execution_plan/translate_classic_stream_pipeline_actions.tsx-pack/platform/plugins/shared/streams/server/lib/streams/state_management/streams/classic_stream.test.tsx-pack/platform/plugins/shared/streams/server/lib/streams/state_management/streams/classic_stream.tsx-pack/platform/plugins/shared/streams/server/lib/streams/state_management/streams/query_stream.tsx-pack/platform/plugins/shared/streams/server/lib/streams/state_management/streams/validate_settings.test.tsx-pack/platform/plugins/shared/streams/server/lib/streams/state_management/streams/validate_settings.tsx-pack/platform/plugins/shared/streams/server/lib/streams/state_management/streams/wired_stream.tsx-pack/platform/plugins/shared/streams/server/lib/streams/state_management/types.tsx-pack/platform/plugins/shared/streams/server/lib/streams/stream_crud.tsx-pack/platform/plugins/shared/streams/server/plugin.tsx-pack/platform/plugins/shared/streams/server/routes/internal/streams/failure_store/route.tsx-pack/platform/plugins/shared/streams/server/routes/internal/streams/management/suggest_processing_pipeline_route.tsx-pack/platform/plugins/shared/streams/server/routes/internal/streams/management/unmanaged_assets_route.tsx-pack/platform/plugins/shared/streams/server/routes/internal/streams/processing/failure_store_samples_handler.tsx-pack/platform/plugins/shared/streams/server/routes/internal/streams/processing/route.tsx-pack/platform/plugins/shared/streams/server/routes/internal/streams/processing/simulation_handler.tsx-pack/platform/plugins/shared/streams/server/routes/internal/streams/schema/route.tsx-pack/platform/plugins/shared/streams/server/routes/streams/crud/read_stream.ts
| this.config.preconfigured.stream_definitions.map(({ name, ...definition }) => ({ | ||
| name, | ||
| request: Streams.all.UpsertRequest.parse( | ||
| ROOT_STREAM_NAMES.includes(name) | ||
| ? { | ||
| ...definition, | ||
| stream: { | ||
| ...definition.stream, | ||
| ingest: { | ||
| ...definition.stream.ingest, | ||
| wired: { | ||
| ...definition.stream.ingest.wired, | ||
| fields: name === LOGS_ECS_STREAM_NAME ? ecsBaseFields : baseFields, | ||
| }, |
There was a problem hiding this comment.
Don't silently discard configured root fields.
For root streams this replaces stream.ingest.wired.fields wholesale with baseFields/ecsBaseFields, so any extra fields from xpack.streams.preconfigured.stream_definitions are lost without validation. Either merge the defaults under the configured fields or fail fast when custom root fields are present.
🛠️ Example fix
- fields: name === LOGS_ECS_STREAM_NAME ? ecsBaseFields : baseFields,
+ fields: {
+ ...(name === LOGS_ECS_STREAM_NAME ? ecsBaseFields : baseFields),
+ ...(definition.stream.ingest.wired.fields ?? {}),
+ },📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| this.config.preconfigured.stream_definitions.map(({ name, ...definition }) => ({ | |
| name, | |
| request: Streams.all.UpsertRequest.parse( | |
| ROOT_STREAM_NAMES.includes(name) | |
| ? { | |
| ...definition, | |
| stream: { | |
| ...definition.stream, | |
| ingest: { | |
| ...definition.stream.ingest, | |
| wired: { | |
| ...definition.stream.ingest.wired, | |
| fields: name === LOGS_ECS_STREAM_NAME ? ecsBaseFields : baseFields, | |
| }, | |
| this.config.preconfigured.stream_definitions.map(({ name, ...definition }) => ({ | |
| name, | |
| request: Streams.all.UpsertRequest.parse( | |
| ROOT_STREAM_NAMES.includes(name) | |
| ? { | |
| ...definition, | |
| stream: { | |
| ...definition.stream, | |
| ingest: { | |
| ...definition.stream.ingest, | |
| wired: { | |
| ...definition.stream.ingest.wired, | |
| fields: { | |
| ...(name === LOGS_ECS_STREAM_NAME ? ecsBaseFields : baseFields), | |
| ...(definition.stream.ingest.wired.fields ?? {}), | |
| }, |
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@x-pack/platform/plugins/shared/streams/server/plugin.ts` around lines 304 -
317, The current mapping of this.config.preconfigured.stream_definitions for
ROOT_STREAM_NAMES (in plugin.ts) overwrites stream.ingest.wired.fields with
baseFields/ecsBaseFields, silently discarding any user-configured root fields;
update the logic in the mapping that calls Streams.all.UpsertRequest.parse so
that for root streams (identified by ROOT_STREAM_NAMES and LOGS_ECS_STREAM_NAME)
you merge existing definition.stream.ingest.wired.fields with the defaults
(baseFields/ecsBaseFields) instead of replacing them, or alternatively detect
and throw a validation error if the incoming definition contains custom root
fields — apply the merge/validation in the same mapping before calling
Streams.all.UpsertRequest.parse so parsing receives the corrected object.
flash1293
left a comment
There was a problem hiding this comment.
Change itself looks good to me, but lets revert the search result provider part for now
| @@ -101,8 +101,11 @@ async function findStreams({ | |||
| }); | |||
|
|
|||
| const hitsWithAccess = searchResponse.hits.hits.filter((hit) => { | |||
| if (Streams.QueryStream.Definition.is(hit._source)) return queryStreamsEnabled; | |||
| return privileges[hit._source.name]?.read === true; | |||
| const canRead = privileges[hit._source.name]?.read === true; | |||
There was a problem hiding this comment.
I'm not sure whether this fix is correct, let's keep the PR focused. we can get back to that
…rack-set-up-streams-through-kibana
⏳ Build in-progress, with failures
Failed CI StepsTest Failures
History
|
## Summary Related to elastic/streams-program#402 This PR allows configuring wired streams definitions in Kibana config and applying them on startup. `StreamsClient` has been refactored to be able to work without Kibana requests, making it possible to pass both scoped and unscoped dependency clients. This change depends on elastic/elasticsearch#143053 **Sample stream definition:** ``` xpack.streams: preconfigured: enabled: true stream_definitions: - name: logs.ecs dashboards: [] queries: [] rules: [] stream: description: "" query_streams: [] ingest: lifecycle: dsl: {} processing: steps: [] settings: {} failure_store: lifecycle: enabled: data_retention: '20d' wired: routing: - destination: logs.ecs.child1 where: field: 'resource.attributes.host.name' startsWith: 'filebeat' status: 'enabled' ``` > [!NOTE] > Just setting `xpack.streams.preconfigured.enabled: true` will only create the root `logs.ecs` and `logs.otel` streams. > [!IMPORTANT] > Since the RulesClient cannot be made unscoped, `queries` and `rules` should always be empty. This is a limitation of the config-based streams for now. <!-- This is an auto-generated comment: release notes by coderabbit.ai --> ## Summary by CodeRabbit * **New Features** * Added support for preconfigured streams, allowing streams to be defined and automatically enabled at startup with customizable field sets (ECS or base fields). * **Configuration** * Extended configuration schema to include preconfiguration options for managing stream definitions. <!-- end of auto-generated comment: release notes by coderabbit.ai --> --------- Co-authored-by: Joe Reuter <johannes.reuter@elastic.co>
## Summary Related to https://github.com/elastic/streams-program/issues/402 This PR allows configuring wired streams definitions in Kibana config and applying them on startup. `StreamsClient` has been refactored to be able to work without Kibana requests, making it possible to pass both scoped and unscoped dependency clients. This change depends on elastic/elasticsearch#143053 **Sample stream definition:** ``` xpack.streams: preconfigured: enabled: true stream_definitions: - name: logs.ecs dashboards: [] queries: [] rules: [] stream: description: "" query_streams: [] ingest: lifecycle: dsl: {} processing: steps: [] settings: {} failure_store: lifecycle: enabled: data_retention: '20d' wired: routing: - destination: logs.ecs.child1 where: field: 'resource.attributes.host.name' startsWith: 'filebeat' status: 'enabled' ``` > [!NOTE] > Just setting `xpack.streams.preconfigured.enabled: true` will only create the root `logs.ecs` and `logs.otel` streams. > [!IMPORTANT] > Since the RulesClient cannot be made unscoped, `queries` and `rules` should always be empty. This is a limitation of the config-based streams for now. <!-- This is an auto-generated comment: release notes by coderabbit.ai --> ## Summary by CodeRabbit * **New Features** * Added support for preconfigured streams, allowing streams to be defined and automatically enabled at startup with customizable field sets (ECS or base fields). * **Configuration** * Extended configuration schema to include preconfiguration options for managing stream definitions. <!-- end of auto-generated comment: release notes by coderabbit.ai --> --------- Co-authored-by: Joe Reuter <johannes.reuter@elastic.co>
Summary
Related to https://github.com/elastic/streams-program/issues/402
This PR allows configuring wired streams definitions in Kibana config and applying them on startup.
StreamsClienthas been refactored to be able to work without Kibana requests, making it possible to pass both scoped and unscoped dependency clients.This change depends on elastic/elasticsearch#143053
Sample stream definition:
Note
Just setting
xpack.streams.preconfigured.enabled: truewill only create the rootlogs.ecsandlogs.otelstreams.Important
Since the RulesClient cannot be made unscoped,
queriesandrulesshould always be empty. This is a limitation of the config-based streams for now.Summary by CodeRabbit
New Features
Configuration