Skip to content

Streams rally track: Set up streams through Kibana#254747

Merged
nikita-lavrov merged 24 commits intoelastic:mainfrom
nikita-lavrov:402-streams-rally-track-set-up-streams-through-kibana
Mar 10, 2026
Merged

Streams rally track: Set up streams through Kibana#254747
nikita-lavrov merged 24 commits intoelastic:mainfrom
nikita-lavrov:402-streams-rally-track-set-up-streams-through-kibana

Conversation

@nikita-lavrov
Copy link
Copy Markdown
Contributor

@nikita-lavrov nikita-lavrov commented Feb 24, 2026

Summary

Related to https://github.com/elastic/streams-program/issues/402

This PR allows configuring wired streams definitions in Kibana config and applying them on startup.

StreamsClient has been refactored to be able to work without Kibana requests, making it possible to pass both scoped and unscoped dependency clients.

This change depends on elastic/elasticsearch#143053

Sample stream definition:

xpack.streams:
  preconfigured:
    enabled: true
    stream_definitions:
    - name: logs.ecs
      dashboards: []
      queries: []
      rules: []
      stream:
        description: ""
        query_streams: []
        ingest:
          lifecycle:
            dsl: {}
          processing:
            steps: []
          settings: {}
          failure_store:
            lifecycle:
              enabled:
                data_retention: '20d'
          wired:
            routing:
            - destination: logs.ecs.child1
              where:
                field: 'resource.attributes.host.name'
                startsWith: 'filebeat'
              status: 'enabled'

Note

Just setting xpack.streams.preconfigured.enabled: true will only create the root logs.ecs and logs.otel streams.

Important

Since the RulesClient cannot be made unscoped, queries and rules should always be empty. This is a limitation of the config-based streams for now.

Summary by CodeRabbit

  • New Features

    • Added support for preconfigured streams, allowing streams to be defined and automatically enabled at startup with customizable field sets (ECS or base fields).
  • Configuration

    • Extended configuration schema to include preconfiguration options for managing stream definitions.

@nikita-lavrov nikita-lavrov added release_note:skip Skip the PR/issue when compiling release notes backport:skip This PR does not require backporting Team:obs-onboarding Observability Onboarding Team Feature:Streams This is the label for the Streams Project labels Feb 25, 2026
@nikita-lavrov nikita-lavrov marked this pull request as ready for review February 27, 2026 13:00
@nikita-lavrov nikita-lavrov requested review from a team as code owners February 27, 2026 13:00
@elasticmachine
Copy link
Copy Markdown
Contributor

Pinging @elastic/obs-onboarding-team (Team:obs-onboarding)

Copy link
Copy Markdown
Contributor

@flash1293 flash1293 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks pretty good @nikita-lavrov , thanks! Left some comments, but it's close.


export const configSchema = schema.object({});
export const configSchema = schema.object({
preconfigured: schema.object({
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure whether this exists, but can we somehow mark this as tech preview?

export const configSchema = schema.object({
preconfigured: schema.object({
enabled: schema.boolean({ defaultValue: false }),
forks: schema.arrayOf(
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The thing with forks vs streams feels kinda weird to me, can we somehow keep this closer to the regular shape of streams configurations?


// Use security API to check if user has all required permissions
const securityClient = this.dependencies.scopedClusterClient.asCurrentUser.security;
const securityClient = this.dependencies.currentUser.security;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's a bit verbose in the diff that you had to change this everywhere, but I actually like it! This makes it much harder to accidentally use the wrong client.

Only nit - currentUser doesn't make it clear this is about the es client, that should be somehow in the name

// Since the RulesClient cannot be unscoped, we provide an empty object that will
// throw an error if dashboards, rules, or queries exist in the stream definition.
// This is a limitation of the config-based streams for now.
const rulesClient = {} as RulesClient;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you mock this more explicitly with method stubs that throw a descriptive error? Otherwise things fail with a "can't call undefined" error that doesn't tell us much


await streamsClient.enableStreams();

for (const { name, parent, where, status } of this.config.preconfigured.forks) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's OK if the config schema itself doesn't catch this (since we don't want to duplicate the streams schema in kbn/config), but in here we should use the schema parsing functions to make sure the data coming in has the right shape and fail with the standard zod error if that's not the case.


for (const { name, parent, where, status } of this.config.preconfigured.forks) {
if (!(await streamsClient.existsStream(name))) {
await streamsClient.forkStream({
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As mentioned in the config schema file, if the config is just a list of streams definition, this is simplified and you can just pass the config to bulkUpsert and be done

Copy link
Copy Markdown
Contributor

@flash1293 flash1293 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is really good, works super well! I have one comment in the code and one idea what we could improve - right now you need to repeat the full definition for the root streams because otherwise it will fail:

 wired:
            fields:
              '[@timestamp]':
                type: date
              '[stream.name]':
                type: system
              '[scope.name]':
                type: keyword
                ignore_above: 1024
              '[host.name]':
                type: keyword
                ignore_above: 1024
              '[trace.id]':
                type: keyword
                ignore_above: 1024
              '[span.id]':
                type: keyword
                ignore_above: 1024
              '[service.name]':
                type: keyword
                ignore_above: 1024
              message:
                type: match_only_text
              '[log.level]':
                type: keyword
                ignore_above: 1024
            routing:

This is verbose, but the bigger problem is that once we use that in the benchmark, it will start failing once we e.g. add a field here. Could you change the logic to spread in the defaults for the root streams in particular so you can omit them in the config file and focus on the stuff you actually want to add? This will make our lives easier down the line

}

if (this.config.preconfigured.enabled) {
void core.getStartServices().then(async ([coreStart]) => {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is this void here doing? Is this is suppress the warning about unhandled promises? If yes, we should probably do a little explanation why it's OK in this spot. Also, let's catch exceptions in this promise chain and at least log them so there is some indication of what happened.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Addressed in 06744bd

@nikita-lavrov
Copy link
Copy Markdown
Contributor Author

/ci

@nikita-lavrov nikita-lavrov requested a review from flash1293 March 5, 2026 10:29
Copy link
Copy Markdown
Contributor

@flash1293 flash1293 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just some nits, then it should be good to go

request: Streams.all.UpsertRequest.parse(definition),
};
})
);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we add a success log as well? Otherwise it's a bit magic

esClientAsInternalUser: esClient,
});

await streamsClient.enableStreams();
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We discussed that a similar use case in the fleet plugin has retry logic in place because it's possible Elasticsearch isn't fully ready yet in automated scenarios. Could we add that?

await streamsClient.bulkUpsert(
this.config.preconfigured.stream_definitions.map(({ name, ...definition }) => {
if (ROOT_STREAM_NAMES.includes(name)) {
definition.stream.ingest.wired.fields =
Copy link
Copy Markdown
Contributor

@flash1293 flash1293 Mar 6, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: This is mutating the root config object, which is something we shouldn't do. Either spread all the layers or do a deep clone. Otherwise another consumer reading this object sees the change from here as well (or not, based on the timing, it's a bug waiting to happen)

@coderabbitai
Copy link
Copy Markdown
Contributor

coderabbitai bot commented Mar 6, 2026

Caution

Review failed

The head commit changed during the review from 9b849dd to dcce23d.

📝 Walkthrough

Walkthrough

This PR refactors the Streams plugin to eliminate KibanaRequest dependencies and replace IScopedClusterClient with ElasticsearchClient throughout. Service methods are renamed from getClientWithRequest to getClient, accepting explicit client parameters instead. Preconfigured streams initialization is added to plugin setup.

Changes

Cohort / File(s) Summary
Configuration and Type Definitions
x-pack/platform/plugins/shared/streams/common/config.ts, x-pack/platform/plugins/shared/streams/server/lib/streams/state_management/types.ts
Config schema updated to include preconfigured object with enabled flag and stream_definitions array. StateDependencies interface changes from scopedClusterClient: IScopedClusterClient to esClient: ElasticsearchClient.
Service Layer Client Methods
x-pack/platform/plugins/shared/streams/server/lib/streams/service.ts, x-pack/platform/plugins/shared/streams/server/lib/streams/attachments/attachment_service.ts, x-pack/platform/plugins/shared/streams/server/lib/streams/query/query_service.ts, x-pack/platform/plugins/shared/streams/server/lib/streams/feature/feature_service.ts
Method signatures renamed from getClientWithRequest({ request }) to getClient({ ...clients }), removing KibanaRequest dependency. Clients now passed explicitly as parameters (soClient, rulesClient, esClient, esClientAsInternalUser) instead of derived from request.
Core Client Classes
x-pack/platform/plugins/shared/streams/server/lib/streams/client.ts
StreamsClient constructor updated to remove KibanaRequest and scopedClusterClient dependencies, replacing with esClient and esClientAsInternalUser for all internal Elasticsearch operations.
Stream CRUD and Utilities
x-pack/platform/plugins/shared/streams/server/lib/streams/stream_crud.ts, x-pack/platform/plugins/shared/streams/server/lib/streams/helpers/validate_fields.ts, x-pack/platform/plugins/shared/streams/server/lib/streams/create_streams_global_search_result_provider.ts
All public function signatures updated to use esClient: ElasticsearchClient instead of scopedClusterClient: IScopedClusterClient across asset retrieval, template fetchers, and access check functions.
State Management - Execution Plans
x-pack/platform/plugins/shared/streams/server/lib/streams/state_management/execution_plan/execution_plan.ts, x-pack/platform/plugins/shared/streams/server/lib/streams/state_management/execution_plan/translate_classic_stream_pipeline_actions.ts, x-pack/platform/plugins/shared/streams/server/lib/streams/state_management/execution_plan/translate_classic_stream_pipeline_actions.test.ts
All references to scopedClusterClient replaced with esClient for pipeline and template operations. Function signatures updated to accept ElasticsearchClient instead of IScopedClusterClient.
State Management - Stream Implementations
x-pack/platform/plugins/shared/streams/server/lib/streams/state_management/streams/classic_stream.ts, x-pack/platform/plugins/shared/streams/server/lib/streams/state_management/streams/classic_stream.test.ts, x-pack/platform/plugins/shared/streams/server/lib/streams/state_management/streams/query_stream.ts, x-pack/platform/plugins/shared/streams/server/lib/streams/state_management/streams/wired_stream.ts, x-pack/platform/plugins/shared/streams/server/lib/streams/state_management/streams/validate_settings.ts, x-pack/platform/plugins/shared/streams/server/lib/streams/state_management/streams/validate_settings.test.ts
All stream validation and data access methods updated to use esClient instead of scopedClusterClient for indices, ingest, and ES
Route Handlers - Processing
x-pack/platform/plugins/shared/streams/server/routes/internal/streams/processing/route.ts, x-pack/platform/plugins/shared/streams/server/routes/internal/streams/processing/simulation_handler.ts, x-pack/platform/plugins/shared/streams/server/routes/internal/streams/processing/failure_store_samples_handler.ts
Route handlers updated to pass esClient (derived from scopedClusterClient.asCurrentUser) instead of raw scopedClusterClient to processing and access check functions.
Route Handlers - Management and Schema
x-pack/platform/plugins/shared/streams/server/routes/internal/streams/management/suggest_processing_pipeline_route.ts, x-pack/platform/plugins/shared/streams/server/routes/internal/streams/management/unmanaged_assets_route.ts, x-pack/platform/plugins/shared/streams/server/routes/internal/streams/schema/route.ts, x-pack/platform/plugins/shared/streams/server/routes/internal/streams/failure_store/route.ts
Routes updated to extract esClient from scopedClusterClient.asCurrentUser for access checks and asset retrieval operations.
Route Handlers - Stream CRUD
x-pack/platform/plugins/shared/streams/server/routes/streams/crud/read_stream.ts
readStream route updated to pass esClient (scopedClusterClient.asCurrentUser) instead of scopedClusterClient for unmanaged assets retrieval.
Plugin Setup
x-pack/platform/plugins/shared/streams/server/plugin.ts
Plugin setup refactored: client retrieval methods updated to use new getClient signatures with explicit clients; preconfigured streams initialization added under setup() with stub RulesClient and per-stream field wiring for ROOT_STREAM_NAMES.

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~75 minutes

🚥 Pre-merge checks | ✅ 2 | ❌ 1

❌ Failed checks (1 warning)

Check name Status Explanation Resolution
Docstring Coverage ⚠️ Warning Docstring coverage is 13.33% which is insufficient. The required threshold is 80.00%. Write docstrings for the functions missing them to satisfy the coverage threshold.
✅ Passed checks (2 passed)
Check name Status Explanation
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.
Title check ✅ Passed The title 'Streams rally track: Set up streams through Kibana' accurately summarizes the main objective of enabling stream configuration through Kibana with preconfigured settings, which is the primary feature across all the file changes shown.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing Touches
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Post copyable unit tests in a comment

Tip

Try Coding Plans. Let us write the prompt for your AI agent so you can ship faster (with fewer bugs).
Share your feedback on Discord.


Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

Copy link
Copy Markdown
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (1)
x-pack/platform/plugins/shared/streams/server/lib/streams/create_streams_global_search_result_provider.ts (1)

98-105: ⚠️ Potential issue | 🟠 Major

Don't bypass read checks for query-stream hits.

storageClient.search() still runs as the internal user, so this branch returns any matching query stream as soon as the UI setting is enabled, even when checkAccessBulk says the caller cannot read it. That leaks stream titles and URLs in global search.

🔐 Suggested fix
-  const hitsWithAccess = searchResponse.hits.hits.filter((hit) => {
-    if (Streams.QueryStream.Definition.is(hit._source)) return queryStreamsEnabled;
-    return privileges[hit._source.name]?.read === true;
-  });
+  const hitsWithAccess = hits.filter((hit) => {
+    const canRead = privileges[hit._source.name]?.read === true;
+    if (Streams.QueryStream.Definition.is(hit._source)) {
+      return queryStreamsEnabled && canRead;
+    }
+    return canRead;
+  });
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In
`@x-pack/platform/plugins/shared/streams/server/lib/streams/create_streams_global_search_result_provider.ts`
around lines 98 - 105, The filter for hitsWithAccess currently bypasses
checkAccessBulk for query streams (Streams.QueryStream.Definition.is) when
queryStreamsEnabled is true, leaking titles/URLs; update the hitsWithAccess
predicate to always consult the privileges map from checkAccessBulk
(privileges[name]?.read === true) and only allow query streams when both
queryStreamsEnabled is true AND the privilege check passes (use the
hit._source.name to look up privileges); modify the logic in the hitsWithAccess
filter (the block using Streams.QueryStream.Definition.is, queryStreamsEnabled,
and privileges) so no branch returns true without verifying privileges from
checkAccessBulk.
♻️ Duplicate comments (1)
x-pack/platform/plugins/shared/streams/server/plugin.ts (1)

275-324: ⚠️ Potential issue | 🟠 Major

Validate unsupported alerting config before using the fake RulesClient.

This startup path casts a three-method stub to RulesClient, so unsupported rules/queries only fail once the preconfiguration sync is already running. If reconciliation touches any other RulesClient member, bootstrap can die mid-sync with a generic runtime error. Reject those shapes before enableStreams/bulkUpsert, and make the fallback client throw a descriptive error for any accessed member.

Run the following script to verify which RulesClient members the streams code can hit and whether the preconfigured path rejects unsupported config before side effects begin:

#!/bin/bash
set -euo pipefail

echo "RulesClient member usage under streams:"
rg -n --type=ts -C2 '\brulesClient\.[A-Za-z_][A-Za-z0-9_]*\b' \
  x-pack/platform/plugins/shared/streams/server/lib/streams \
  x-pack/platform/plugins/shared/streams/server/plugin.ts

echo
echo "Preconfigured startup branch:"
sed -n '266,329p' x-pack/platform/plugins/shared/streams/server/plugin.ts

Expected result: only members intentionally supported by the stub are reachable, and unsupported rules/queries are rejected before bulkUpsert starts. As per coding guidelines, "avoid any and unknown."

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@x-pack/platform/plugins/shared/streams/server/plugin.ts` around lines 275 -
324, The current stubbed RulesClient (the object cast to RulesClient) only
implements bulkGetRules/create/update and is used when building
attachmentService.getClient, queryService.getClient and
streamsService.getClient, which risks runtime failures during
streamsClient.enableStreams or streamsClient.bulkUpsert; instead, validate the
preconfigured.rules/queries shapes up-front (before calling enableStreams or
bulkUpsert on streamsClient) and replace the stub with a defensive proxy that
throws a clear, descriptive Error for any accessed/unimplemented member; locate
the RulesClient stub in plugin.ts, add a validation step that inspects
this.config.preconfigured.stream_definitions (and any rules/queries entries) and
rejects invalid shapes before constructing
attachmentClient/featureClient/queryClient, and implement the fallback
RulesClient as a Proxy or wrapper that throws "Unsupported RulesClient method:
<methodName>" for any property access other than the explicitly supported
methods (bulkGetRules, create, update).
🧹 Nitpick comments (2)
x-pack/platform/plugins/shared/streams/common/config.ts (1)

14-14: Prefer a typed schema over schema.any() for stream_definitions.

Using schema.any() bypasses config validation entirely, allowing malformed stream definitions to slip through until runtime. Since stream definitions have a known structure (per PR notes requiring "full definitions" for nested items), consider defining a proper schema—even a minimal one—to catch configuration errors at startup.

If the shape is intentionally flexible during this initial implementation, at minimum document the expected structure inline or consider marking this configuration as experimental/tech preview.

As per coding guidelines: "Use TypeScript for all new code; avoid any and unknown."

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@x-pack/platform/plugins/shared/streams/common/config.ts` at line 14, Replace
the untyped schema.any() used for stream_definitions with a concrete schema that
enforces the known stream definition shape: change stream_definitions to
schema.arrayOf(schema.object(...)) and validate required top-level properties
(e.g., id/name/type as strings, an optional description, and a nested items
array/object with its own minimal schema) so malformed configs fail at startup;
update the symbol stream_definitions in config.ts and add inline comments
documenting the expected fields and mark it experimental if the shape may
change.
x-pack/platform/plugins/shared/streams/server/lib/streams/state_management/execution_plan/translate_classic_stream_pipeline_actions.test.ts (1)

37-50: Add one regression case for the internal-user client path.

These tests still exercise the helper through asCurrentUser, so they don't pin the startup path this refactor adds. Switching one happy-path case to asInternalUser would make it much harder to accidentally reintroduce a request-scoped assumption later.

🧪 Minimal test tweak
-      clusterClient.asCurrentUser.indices.getIndexTemplate.mockImplementationOnce(async () => ({
+      clusterClient.asInternalUser.indices.getIndexTemplate.mockImplementationOnce(async () => ({
         index_templates: [
           {
             name: 'my-template',
             index_template: {
               composed_of: [],
               index_patterns: 'my-*',
             },
           },
         ],
       }));

-      await translateClassicStreamPipelineActions(actionsByType, clusterClient.asCurrentUser);
+      await translateClassicStreamPipelineActions(actionsByType, clusterClient.asInternalUser);
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In
`@x-pack/platform/plugins/shared/streams/server/lib/streams/state_management/execution_plan/translate_classic_stream_pipeline_actions.test.ts`
around lines 37 - 50, Add a regression test that exercises the internal-user
code path by calling translateClassicStreamPipelineActions with
clusterClient.asInternalUser instead of asCurrentUser; specifically, duplicate
the existing happy-path setup (mocking
clusterClient.asInternalUser.indices.getIndexTemplate to return the same
index_templates payload) and invoke
translateClassicStreamPipelineActions(actionsByType,
clusterClient.asInternalUser) so the test covers the internal-user client path
and prevents reintroducing request-scoped assumptions in
translateClassicStreamPipelineActions.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@x-pack/platform/plugins/shared/streams/server/plugin.ts`:
- Around line 304-317: The current mapping of
this.config.preconfigured.stream_definitions for ROOT_STREAM_NAMES (in
plugin.ts) overwrites stream.ingest.wired.fields with baseFields/ecsBaseFields,
silently discarding any user-configured root fields; update the logic in the
mapping that calls Streams.all.UpsertRequest.parse so that for root streams
(identified by ROOT_STREAM_NAMES and LOGS_ECS_STREAM_NAME) you merge existing
definition.stream.ingest.wired.fields with the defaults
(baseFields/ecsBaseFields) instead of replacing them, or alternatively detect
and throw a validation error if the incoming definition contains custom root
fields — apply the merge/validation in the same mapping before calling
Streams.all.UpsertRequest.parse so parsing receives the corrected object.

---

Outside diff comments:
In
`@x-pack/platform/plugins/shared/streams/server/lib/streams/create_streams_global_search_result_provider.ts`:
- Around line 98-105: The filter for hitsWithAccess currently bypasses
checkAccessBulk for query streams (Streams.QueryStream.Definition.is) when
queryStreamsEnabled is true, leaking titles/URLs; update the hitsWithAccess
predicate to always consult the privileges map from checkAccessBulk
(privileges[name]?.read === true) and only allow query streams when both
queryStreamsEnabled is true AND the privilege check passes (use the
hit._source.name to look up privileges); modify the logic in the hitsWithAccess
filter (the block using Streams.QueryStream.Definition.is, queryStreamsEnabled,
and privileges) so no branch returns true without verifying privileges from
checkAccessBulk.

---

Duplicate comments:
In `@x-pack/platform/plugins/shared/streams/server/plugin.ts`:
- Around line 275-324: The current stubbed RulesClient (the object cast to
RulesClient) only implements bulkGetRules/create/update and is used when
building attachmentService.getClient, queryService.getClient and
streamsService.getClient, which risks runtime failures during
streamsClient.enableStreams or streamsClient.bulkUpsert; instead, validate the
preconfigured.rules/queries shapes up-front (before calling enableStreams or
bulkUpsert on streamsClient) and replace the stub with a defensive proxy that
throws a clear, descriptive Error for any accessed/unimplemented member; locate
the RulesClient stub in plugin.ts, add a validation step that inspects
this.config.preconfigured.stream_definitions (and any rules/queries entries) and
rejects invalid shapes before constructing
attachmentClient/featureClient/queryClient, and implement the fallback
RulesClient as a Proxy or wrapper that throws "Unsupported RulesClient method:
<methodName>" for any property access other than the explicitly supported
methods (bulkGetRules, create, update).

---

Nitpick comments:
In `@x-pack/platform/plugins/shared/streams/common/config.ts`:
- Line 14: Replace the untyped schema.any() used for stream_definitions with a
concrete schema that enforces the known stream definition shape: change
stream_definitions to schema.arrayOf(schema.object(...)) and validate required
top-level properties (e.g., id/name/type as strings, an optional description,
and a nested items array/object with its own minimal schema) so malformed
configs fail at startup; update the symbol stream_definitions in config.ts and
add inline comments documenting the expected fields and mark it experimental if
the shape may change.

In
`@x-pack/platform/plugins/shared/streams/server/lib/streams/state_management/execution_plan/translate_classic_stream_pipeline_actions.test.ts`:
- Around line 37-50: Add a regression test that exercises the internal-user code
path by calling translateClassicStreamPipelineActions with
clusterClient.asInternalUser instead of asCurrentUser; specifically, duplicate
the existing happy-path setup (mocking
clusterClient.asInternalUser.indices.getIndexTemplate to return the same
index_templates payload) and invoke
translateClassicStreamPipelineActions(actionsByType,
clusterClient.asInternalUser) so the test covers the internal-user client path
and prevents reintroducing request-scoped assumptions in
translateClassicStreamPipelineActions.

ℹ️ Review info
⚙️ Run configuration

Configuration used: Path: .coderabbit.yml

Review profile: CHILL

Plan: Pro

Run ID: cd7fb284-f089-47bc-ac4e-4e727b5c8fc1

📥 Commits

Reviewing files that changed from the base of the PR and between 41c7233 and 4a32762.

📒 Files selected for processing (28)
  • x-pack/platform/plugins/shared/streams/common/config.ts
  • x-pack/platform/plugins/shared/streams/server/lib/streams/assets/query/query_service.ts
  • x-pack/platform/plugins/shared/streams/server/lib/streams/attachments/attachment_service.ts
  • x-pack/platform/plugins/shared/streams/server/lib/streams/client.ts
  • x-pack/platform/plugins/shared/streams/server/lib/streams/create_streams_global_search_result_provider.ts
  • x-pack/platform/plugins/shared/streams/server/lib/streams/feature/feature_service.ts
  • x-pack/platform/plugins/shared/streams/server/lib/streams/helpers/validate_fields.ts
  • x-pack/platform/plugins/shared/streams/server/lib/streams/service.ts
  • x-pack/platform/plugins/shared/streams/server/lib/streams/state_management/execution_plan/execution_plan.ts
  • x-pack/platform/plugins/shared/streams/server/lib/streams/state_management/execution_plan/translate_classic_stream_pipeline_actions.test.ts
  • x-pack/platform/plugins/shared/streams/server/lib/streams/state_management/execution_plan/translate_classic_stream_pipeline_actions.ts
  • x-pack/platform/plugins/shared/streams/server/lib/streams/state_management/streams/classic_stream.test.ts
  • x-pack/platform/plugins/shared/streams/server/lib/streams/state_management/streams/classic_stream.ts
  • x-pack/platform/plugins/shared/streams/server/lib/streams/state_management/streams/query_stream.ts
  • x-pack/platform/plugins/shared/streams/server/lib/streams/state_management/streams/validate_settings.test.ts
  • x-pack/platform/plugins/shared/streams/server/lib/streams/state_management/streams/validate_settings.ts
  • x-pack/platform/plugins/shared/streams/server/lib/streams/state_management/streams/wired_stream.ts
  • x-pack/platform/plugins/shared/streams/server/lib/streams/state_management/types.ts
  • x-pack/platform/plugins/shared/streams/server/lib/streams/stream_crud.ts
  • x-pack/platform/plugins/shared/streams/server/plugin.ts
  • x-pack/platform/plugins/shared/streams/server/routes/internal/streams/failure_store/route.ts
  • x-pack/platform/plugins/shared/streams/server/routes/internal/streams/management/suggest_processing_pipeline_route.ts
  • x-pack/platform/plugins/shared/streams/server/routes/internal/streams/management/unmanaged_assets_route.ts
  • x-pack/platform/plugins/shared/streams/server/routes/internal/streams/processing/failure_store_samples_handler.ts
  • x-pack/platform/plugins/shared/streams/server/routes/internal/streams/processing/route.ts
  • x-pack/platform/plugins/shared/streams/server/routes/internal/streams/processing/simulation_handler.ts
  • x-pack/platform/plugins/shared/streams/server/routes/internal/streams/schema/route.ts
  • x-pack/platform/plugins/shared/streams/server/routes/streams/crud/read_stream.ts

Comment on lines +304 to +317
this.config.preconfigured.stream_definitions.map(({ name, ...definition }) => ({
name,
request: Streams.all.UpsertRequest.parse(
ROOT_STREAM_NAMES.includes(name)
? {
...definition,
stream: {
...definition.stream,
ingest: {
...definition.stream.ingest,
wired: {
...definition.stream.ingest.wired,
fields: name === LOGS_ECS_STREAM_NAME ? ecsBaseFields : baseFields,
},
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Don't silently discard configured root fields.

For root streams this replaces stream.ingest.wired.fields wholesale with baseFields/ecsBaseFields, so any extra fields from xpack.streams.preconfigured.stream_definitions are lost without validation. Either merge the defaults under the configured fields or fail fast when custom root fields are present.

🛠️ Example fix
-                            fields: name === LOGS_ECS_STREAM_NAME ? ecsBaseFields : baseFields,
+                            fields: {
+                              ...(name === LOGS_ECS_STREAM_NAME ? ecsBaseFields : baseFields),
+                              ...(definition.stream.ingest.wired.fields ?? {}),
+                            },
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
this.config.preconfigured.stream_definitions.map(({ name, ...definition }) => ({
name,
request: Streams.all.UpsertRequest.parse(
ROOT_STREAM_NAMES.includes(name)
? {
...definition,
stream: {
...definition.stream,
ingest: {
...definition.stream.ingest,
wired: {
...definition.stream.ingest.wired,
fields: name === LOGS_ECS_STREAM_NAME ? ecsBaseFields : baseFields,
},
this.config.preconfigured.stream_definitions.map(({ name, ...definition }) => ({
name,
request: Streams.all.UpsertRequest.parse(
ROOT_STREAM_NAMES.includes(name)
? {
...definition,
stream: {
...definition.stream,
ingest: {
...definition.stream.ingest,
wired: {
...definition.stream.ingest.wired,
fields: {
...(name === LOGS_ECS_STREAM_NAME ? ecsBaseFields : baseFields),
...(definition.stream.ingest.wired.fields ?? {}),
},
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@x-pack/platform/plugins/shared/streams/server/plugin.ts` around lines 304 -
317, The current mapping of this.config.preconfigured.stream_definitions for
ROOT_STREAM_NAMES (in plugin.ts) overwrites stream.ingest.wired.fields with
baseFields/ecsBaseFields, silently discarding any user-configured root fields;
update the logic in the mapping that calls Streams.all.UpsertRequest.parse so
that for root streams (identified by ROOT_STREAM_NAMES and LOGS_ECS_STREAM_NAME)
you merge existing definition.stream.ingest.wired.fields with the defaults
(baseFields/ecsBaseFields) instead of replacing them, or alternatively detect
and throw a validation error if the incoming definition contains custom root
fields — apply the merge/validation in the same mapping before calling
Streams.all.UpsertRequest.parse so parsing receives the corrected object.

Copy link
Copy Markdown
Contributor

@flash1293 flash1293 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Change itself looks good to me, but lets revert the search result provider part for now

@@ -101,8 +101,11 @@ async function findStreams({
});

const hitsWithAccess = searchResponse.hits.hits.filter((hit) => {
if (Streams.QueryStream.Definition.is(hit._source)) return queryStreamsEnabled;
return privileges[hit._source.name]?.read === true;
const canRead = privileges[hit._source.name]?.read === true;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure whether this fix is correct, let's keep the PR focused. we can get back to that

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reverted in da15a9f

@elasticmachine
Copy link
Copy Markdown
Contributor

⏳ Build in-progress, with failures

Failed CI Steps

Test Failures

  • [job] [logs] FTR Configs #109 / apis Streams Endpoints Basic functionality initially after enabling "before all" hook for "reports enabled status"
  • [job] [logs] Jest Tests #9 / WiredStream doDetermineCreateActions - ES|QL view does not emit upsert_esql_view when isWiredStreamViewsEnabled is false
  • [job] [logs] Jest Tests #9 / WiredStream doDetermineCreateActions - ES|QL view does not emit upsert_esql_view when isWiredStreamViewsEnabled is false
  • [job] [logs] Jest Tests #9 / WiredStream doDetermineCreateActions - ES|QL view includes upsert_esql_view action with correct name and query
  • [job] [logs] Jest Tests #9 / WiredStream doDetermineCreateActions - ES|QL view includes upsert_esql_view action with correct name and query
  • [job] [logs] Jest Tests #9 / WiredStream doDetermineCreateActions - ES|QL view uses the stream name to build the view query
  • [job] [logs] Jest Tests #9 / WiredStream doDetermineCreateActions - ES|QL view uses the stream name to build the view query
  • [job] [logs] Jest Tests #9 / WiredStream ES|QL view actions doDetermineCreateActions creates a view referencing child views when the stream has children
  • [job] [logs] Jest Tests #9 / WiredStream ES|QL view actions doDetermineCreateActions creates a view referencing child views when the stream has children
  • [job] [logs] Jest Tests #9 / WiredStream ES|QL view actions doDetermineCreateActions creates a view with only the stream itself when there are no children
  • [job] [logs] Jest Tests #9 / WiredStream ES|QL view actions doDetermineCreateActions creates a view with only the stream itself when there are no children

History

@nikita-lavrov nikita-lavrov merged commit 0c21c8a into elastic:main Mar 10, 2026
18 checks passed
qn895 pushed a commit to qn895/kibana that referenced this pull request Mar 11, 2026
## Summary

Related to elastic/streams-program#402

This PR allows configuring wired streams definitions in Kibana config
and applying them on startup.

`StreamsClient` has been refactored to be able to work without Kibana
requests, making it possible to pass both scoped and unscoped dependency
clients.

This change depends on
elastic/elasticsearch#143053

**Sample stream definition:**
```
xpack.streams:
  preconfigured:
    enabled: true
    stream_definitions:
    - name: logs.ecs
      dashboards: []
      queries: []
      rules: []
      stream:
        description: ""
        query_streams: []
        ingest:
          lifecycle:
            dsl: {}
          processing:
            steps: []
          settings: {}
          failure_store:
            lifecycle:
              enabled:
                data_retention: '20d'
          wired:
            routing:
            - destination: logs.ecs.child1
              where:
                field: 'resource.attributes.host.name'
                startsWith: 'filebeat'
              status: 'enabled'
``` 
> [!NOTE]
> Just setting `xpack.streams.preconfigured.enabled: true` will only
create the root `logs.ecs` and `logs.otel` streams.

> [!IMPORTANT]
> Since the RulesClient cannot be made unscoped, `queries` and `rules`
should always be empty. This is a limitation of the config-based streams
for now.

<!-- This is an auto-generated comment: release notes by coderabbit.ai
-->
## Summary by CodeRabbit

* **New Features**
* Added support for preconfigured streams, allowing streams to be
defined and automatically enabled at startup with customizable field
sets (ECS or base fields).

* **Configuration**
* Extended configuration schema to include preconfiguration options for
managing stream definitions.
<!-- end of auto-generated comment: release notes by coderabbit.ai -->

---------

Co-authored-by: Joe Reuter <johannes.reuter@elastic.co>
sorenlouv pushed a commit that referenced this pull request Mar 17, 2026
## Summary

Related to https://github.com/elastic/streams-program/issues/402

This PR allows configuring wired streams definitions in Kibana config
and applying them on startup.

`StreamsClient` has been refactored to be able to work without Kibana
requests, making it possible to pass both scoped and unscoped dependency
clients.

This change depends on
elastic/elasticsearch#143053

**Sample stream definition:**
```
xpack.streams:
  preconfigured:
    enabled: true
    stream_definitions:
    - name: logs.ecs
      dashboards: []
      queries: []
      rules: []
      stream:
        description: ""
        query_streams: []
        ingest:
          lifecycle:
            dsl: {}
          processing:
            steps: []
          settings: {}
          failure_store:
            lifecycle:
              enabled:
                data_retention: '20d'
          wired:
            routing:
            - destination: logs.ecs.child1
              where:
                field: 'resource.attributes.host.name'
                startsWith: 'filebeat'
              status: 'enabled'
``` 
> [!NOTE]
> Just setting `xpack.streams.preconfigured.enabled: true` will only
create the root `logs.ecs` and `logs.otel` streams.

> [!IMPORTANT]
> Since the RulesClient cannot be made unscoped, `queries` and `rules`
should always be empty. This is a limitation of the config-based streams
for now.

<!-- This is an auto-generated comment: release notes by coderabbit.ai
-->
## Summary by CodeRabbit

* **New Features**
* Added support for preconfigured streams, allowing streams to be
defined and automatically enabled at startup with customizable field
sets (ECS or base fields).

* **Configuration**
* Extended configuration schema to include preconfiguration options for
managing stream definitions.
<!-- end of auto-generated comment: release notes by coderabbit.ai -->

---------

Co-authored-by: Joe Reuter <johannes.reuter@elastic.co>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

backport:skip This PR does not require backporting Feature:Streams This is the label for the Streams Project release_note:skip Skip the PR/issue when compiling release notes Team:obs-onboarding Observability Onboarding Team v9.4.0

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants