Skip to content

Commit 02817c8

Browse files
[8.19] 🌊 Show detected field types for classic streams enrichment (#222579) (#222798)
# Backport This will backport the following commits from `main` to `8.19`: - [🌊 Show detected field types for classic streams enrichment (#222579)](#222579) <!--- Backport version: 9.6.6 --> ### Questions ? Please refer to the [Backport tool documentation](https://github.com/sorenlouv/backport) <!--BACKPORT [{"author":{"name":"Joe Reuter","email":"johannes.reuter@elastic.co"},"sourceCommit":{"committedDate":"2025-06-05T11:45:32Z","message":"🌊 Show detected field types for classic streams enrichment (#222579)\n\nAdds \"detected fields\" tab for classic streams enrichment editor\n\n<img width=\"1005\" alt=\"Screenshot 2025-06-04 at 17 58 57\"\nsrc=\"https://github.com/user-attachments/assets/3f3bc959-a27d-4e53-af96-153f0cd0fb54\"\n/>\n\nThis PR adds the \"detected fields\" tab for classic streams by fetching\nthe actual Elasticsearch field type from field caps and showing it along\nwith the detected fields. This currently doesn't work for fields that\nare not mapped yet but would get added as part of the simulation\n(Elasticsearch feature request here:\nhttps://github.com/elastic/elasticsearch/issues/128760 ).\n\nThis adds a new column \"Elasticsearch field type\" to the schema editor\ntable. For wired streams, this column is not relevant at all, but it can\nbe helpful for classic streams to highlight the non-managed parts.\n\n---------\n\nCo-authored-by: kibanamachine <42973632+kibanamachine@users.noreply.github.com>","sha":"36699ad4ae5c2f9ba212e05e55c6ee9692a9b2e5","branchLabelMapping":{"^v9.1.0$":"main","^v(\\d+).(\\d+).\\d+$":"$1.$2"}},"sourcePullRequest":{"labels":["release_note:skip","Team:obs-ux-logs","backport:version","Feature:Streams","v9.1.0","v8.19.0"],"title":"🌊 Show detected field types for classic streams enrichment","number":222579,"url":"https://github.com/elastic/kibana/pull/222579","mergeCommit":{"message":"🌊 Show detected field types for classic streams enrichment (#222579)\n\nAdds \"detected fields\" tab for classic streams enrichment editor\n\n<img width=\"1005\" alt=\"Screenshot 2025-06-04 at 17 58 57\"\nsrc=\"https://github.com/user-attachments/assets/3f3bc959-a27d-4e53-af96-153f0cd0fb54\"\n/>\n\nThis PR adds the \"detected fields\" tab for classic streams by fetching\nthe actual Elasticsearch field type from field caps and showing it along\nwith the detected fields. This currently doesn't work for fields that\nare not mapped yet but would get added as part of the simulation\n(Elasticsearch feature request here:\nhttps://github.com/elastic/elasticsearch/issues/128760 ).\n\nThis adds a new column \"Elasticsearch field type\" to the schema editor\ntable. For wired streams, this column is not relevant at all, but it can\nbe helpful for classic streams to highlight the non-managed parts.\n\n---------\n\nCo-authored-by: kibanamachine <42973632+kibanamachine@users.noreply.github.com>","sha":"36699ad4ae5c2f9ba212e05e55c6ee9692a9b2e5"}},"sourceBranch":"main","suggestedTargetBranches":["8.19"],"targetPullRequestStates":[{"branch":"main","label":"v9.1.0","branchLabelMappingKey":"^v9.1.0$","isSourceBranch":true,"state":"MERGED","url":"https://github.com/elastic/kibana/pull/222579","number":222579,"mergeCommit":{"message":"🌊 Show detected field types for classic streams enrichment (#222579)\n\nAdds \"detected fields\" tab for classic streams enrichment editor\n\n<img width=\"1005\" alt=\"Screenshot 2025-06-04 at 17 58 57\"\nsrc=\"https://github.com/user-attachments/assets/3f3bc959-a27d-4e53-af96-153f0cd0fb54\"\n/>\n\nThis PR adds the \"detected fields\" tab for classic streams by fetching\nthe actual Elasticsearch field type from field caps and showing it along\nwith the detected fields. This currently doesn't work for fields that\nare not mapped yet but would get added as part of the simulation\n(Elasticsearch feature request here:\nhttps://github.com/elastic/elasticsearch/issues/128760 ).\n\nThis adds a new column \"Elasticsearch field type\" to the schema editor\ntable. For wired streams, this column is not relevant at all, but it can\nbe helpful for classic streams to highlight the non-managed parts.\n\n---------\n\nCo-authored-by: kibanamachine <42973632+kibanamachine@users.noreply.github.com>","sha":"36699ad4ae5c2f9ba212e05e55c6ee9692a9b2e5"}},{"branch":"8.19","label":"v8.19.0","branchLabelMappingKey":"^v(\\d+).(\\d+).\\d+$","isSourceBranch":false,"state":"NOT_CREATED"}]}] BACKPORT--> Co-authored-by: Joe Reuter <johannes.reuter@elastic.co>
1 parent 4bc0bd7 commit 02817c8

7 files changed

Lines changed: 117 additions & 55 deletions

File tree

β€Žx-pack/platform/plugins/shared/streams/server/routes/internal/streams/processing/simulation_handler.tsβ€Ž

Lines changed: 46 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ import {
1818
IndicesIndexState,
1919
SimulateIngestResponse,
2020
SimulateIngestSimulateIngestDocumentResult,
21+
FieldCapsResponse,
2122
} from '@elastic/elasticsearch/lib/api/types';
2223
import { IScopedClusterClient } from '@kbn/core/server';
2324
import { flattenObjectNestedLast, calculateObjectDiff } from '@kbn/object-utils';
@@ -138,10 +139,10 @@ export type IngestSimulationResult =
138139
};
139140

140141
export type DetectedField =
141-
| WithName
142-
| WithName<FieldDefinitionConfig | InheritedFieldDefinitionConfig>;
142+
| WithNameAndEsType
143+
| WithNameAndEsType<FieldDefinitionConfig | InheritedFieldDefinitionConfig>;
143144

144-
export type WithName<TObj = {}> = TObj & { name: string };
145+
export type WithNameAndEsType<TObj = {}> = TObj & { name: string; esType?: string };
145146
export type WithRequired<TObj, TKey extends keyof TObj> = TObj & { [TProp in TKey]-?: TObj[TProp] };
146147

147148
export const simulateProcessing = async ({
@@ -150,15 +151,20 @@ export const simulateProcessing = async ({
150151
streamsClient,
151152
}: SimulateProcessingDeps) => {
152153
/* 0. Retrieve required data to prepare the simulation */
153-
const [stream, streamIndex] = await Promise.all([
154-
streamsClient.getStream(params.path.name),
155-
getStreamIndex(scopedClusterClient, streamsClient, params.path.name),
156-
]);
154+
const [stream, { indexState: streamIndexState, fieldCaps: streamIndexFieldCaps }] =
155+
await Promise.all([
156+
streamsClient.getStream(params.path.name),
157+
getStreamIndex(scopedClusterClient, streamsClient, params.path.name),
158+
]);
157159

158160
/* 1. Prepare data for either simulation types (ingest, pipeline), prepare simulation body for the mandatory pipeline simulation */
159161
const simulationData = prepareSimulationData(params);
160162
const pipelineSimulationBody = preparePipelineSimulationBody(simulationData);
161-
const ingestSimulationBody = prepareIngestSimulationBody(simulationData, streamIndex, params);
163+
const ingestSimulationBody = prepareIngestSimulationBody(
164+
simulationData,
165+
streamIndexState,
166+
params
167+
);
162168
/**
163169
* 2. Run both pipeline and ingest simulations in parallel.
164170
* - The pipeline simulation is used to extract the documents reports and the processor metrics. This always runs.
@@ -189,7 +195,12 @@ export const simulateProcessing = async ({
189195
);
190196

191197
/* 5. Extract valid detected fields asserting existing mapped fields from stream and ancestors */
192-
const detectedFields = await computeDetectedFields(processorsMetrics, params, streamFields);
198+
const detectedFields = await computeDetectedFields(
199+
processorsMetrics,
200+
params,
201+
streamFields,
202+
streamIndexFieldCaps
203+
);
193204

194205
/* 6. Derive general insights and process final response body */
195206
return prepareSimulationResponse(docReports, processorsMetrics, detectedFields);
@@ -746,18 +757,30 @@ const getStreamIndex = async (
746757
scopedClusterClient: IScopedClusterClient,
747758
streamsClient: StreamsClient,
748759
streamName: string
749-
): Promise<IndicesIndexState> => {
760+
): Promise<{
761+
indexState: IndicesIndexState;
762+
fieldCaps: FieldCapsResponse['fields'];
763+
}> => {
750764
const dataStream = await streamsClient.getDataStream(streamName);
751-
const lastIndex = dataStream.indices.at(-1);
752-
if (!lastIndex) {
765+
const lastIndexRef = dataStream.indices.at(-1);
766+
if (!lastIndexRef) {
753767
throw new Error(`No writing index found for stream ${streamName}`);
754768
}
755769

756-
const lastIndexMapping = await scopedClusterClient.asCurrentUser.indices.get({
757-
index: lastIndex.index_name,
758-
});
770+
const [lastIndex, lastIndexFieldCaps] = await Promise.all([
771+
scopedClusterClient.asCurrentUser.indices.get({
772+
index: lastIndexRef.index_name,
773+
}),
774+
scopedClusterClient.asCurrentUser.fieldCaps({
775+
index: lastIndexRef.index_name,
776+
fields: '*',
777+
}),
778+
]);
759779

760-
return lastIndexMapping[lastIndex.index_name];
780+
return {
781+
indexState: lastIndex[lastIndexRef.index_name],
782+
fieldCaps: lastIndexFieldCaps.fields,
783+
};
761784
};
762785

763786
const getStreamFields = async (
@@ -782,7 +805,8 @@ const getStreamFields = async (
782805
const computeDetectedFields = async (
783806
processorsMetrics: Record<string, ProcessorMetrics>,
784807
params: ProcessingSimulationParams,
785-
streamFields: FieldDefinition
808+
streamFields: FieldDefinition,
809+
streamFieldCaps: FieldCapsResponse['fields']
786810
): Promise<DetectedField[]> => {
787811
const fields = Object.values(processorsMetrics).flatMap((metrics) => metrics.detected_fields);
788812

@@ -801,7 +825,11 @@ const computeDetectedFields = async (
801825
return { name, ...existingField };
802826
}
803827

804-
return { name, type: confirmedValidDetectedFields[name]?.type };
828+
const existingFieldCaps = Object.keys(streamFieldCaps[name] || {});
829+
830+
const esType = existingFieldCaps.length > 0 ? existingFieldCaps[0] : undefined;
831+
832+
return { name, type: confirmedValidDetectedFields[name]?.type, esType };
805833
});
806834
};
807835

β€Žx-pack/platform/plugins/shared/streams_app/public/components/data_management/schema_editor/index.tsxβ€Ž

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ export function SchemaEditor({
2525
withControls = false,
2626
withFieldSimulation = false,
2727
withTableActions = false,
28+
withToolbar = true,
2829
}: SchemaEditorProps) {
2930
const [controls, updateControls] = useControls();
3031

@@ -51,6 +52,7 @@ export function SchemaEditor({
5152
<FieldsTable
5253
isLoading={isLoading ?? false}
5354
controls={controls}
55+
withToolbar={withToolbar}
5456
defaultColumns={defaultColumns}
5557
fields={fields}
5658
stream={stream}

β€Žx-pack/platform/plugins/shared/streams_app/public/components/data_management/schema_editor/schema_editor_table.tsxβ€Ž

Lines changed: 25 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,8 @@ import {
1313
EuiDataGrid,
1414
EuiDataGridCellProps,
1515
EuiDataGridControlColumn,
16+
EuiIconTip,
17+
EuiFlexGroup,
1618
} from '@elastic/eui';
1719
import { i18n } from '@kbn/i18n';
1820
import { Streams } from '@kbn/streams-schema';
@@ -32,13 +34,15 @@ export function FieldsTable({
3234
fields,
3335
stream,
3436
withTableActions,
37+
withToolbar,
3538
}: {
3639
isLoading: boolean;
3740
controls: TControls;
3841
defaultColumns: TableColumnName[];
3942
fields: SchemaField[];
4043
stream: Streams.ingest.all.Definition;
4144
withTableActions: boolean;
45+
withToolbar: boolean;
4246
}) {
4347
// Column visibility
4448
const [visibleColumns, setVisibleColumns] = useState<string[]>(defaultColumns);
@@ -82,7 +86,7 @@ export function FieldsTable({
8286
canDragAndDropColumns: false,
8387
}}
8488
sorting={{ columns: sortingColumns, onSort: setSortingColumns }}
85-
toolbarVisibility={true}
89+
toolbarVisibility={withToolbar}
8690
rowCount={filteredFields.length}
8791
renderCellValue={RenderCellValue}
8892
trailingControlColumns={trailingColumns}
@@ -107,7 +111,26 @@ const createCellRenderer =
107111
const { parent, status } = field;
108112

109113
if (columnId === 'type') {
110-
if (!field.type) return EMPTY_CONTENT;
114+
if (!field.type) {
115+
if (field.status === 'unmapped' && field.esType) {
116+
return (
117+
<EuiFlexGroup alignItems="center" gutterSize="xs" responsive={false}>
118+
{field.esType}
119+
<EuiIconTip
120+
content={i18n.translate(
121+
'xpack.streams.streamDetailSchemaEditorFieldsTableTypeEsTypeTooltip',
122+
{
123+
defaultMessage:
124+
'This field is not managed by Streams, but is defined in Elasticsearch. It can be controlled via the underlying index template and component templates available in the "Advanced" tab.',
125+
}
126+
)}
127+
position="right"
128+
/>
129+
</EuiFlexGroup>
130+
);
131+
}
132+
return EMPTY_CONTENT;
133+
}
111134
return <FieldType type={field.type} aliasFor={field.alias_for} />;
112135
}
113136

β€Žx-pack/platform/plugins/shared/streams_app/public/components/data_management/schema_editor/types.tsβ€Ž

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,10 @@ export interface MappedSchemaField extends BaseSchemaField {
3131
export interface UnmappedSchemaField extends BaseSchemaField {
3232
status: 'unmapped';
3333
type?: SchemaFieldType;
34+
/**
35+
* Elasticsearch-level type of the field - only available for fields of classic streams that are not mapped through streams but from the underlying index.
36+
*/
37+
esType?: string;
3438
additionalParameters?: FieldDefinitionConfigAdvancedParameters;
3539
}
3640

@@ -47,6 +51,7 @@ export interface SchemaEditorProps {
4751
withControls?: boolean;
4852
withFieldSimulation?: boolean;
4953
withTableActions?: boolean;
54+
withToolbar?: boolean;
5055
}
5156

5257
export const isSchemaFieldTyped = (field: SchemaField): field is MappedSchemaField => {

β€Žx-pack/platform/plugins/shared/streams_app/public/components/data_management/stream_detail_enrichment/detected_fields_editor.tsxβ€Ž

Lines changed: 23 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -16,14 +16,15 @@ import { SchemaField } from '../schema_editor/types';
1616
import { useStreamEnrichmentEvents } from './state_management/stream_enrichment_state_machine';
1717

1818
interface DetectedFieldsEditorProps {
19-
definition: Streams.WiredStream.GetResponse;
19+
definition: Streams.ingest.all.GetResponse;
2020
detectedFields: SchemaField[];
2121
}
2222

2323
export const DetectedFieldsEditor = ({ definition, detectedFields }: DetectedFieldsEditorProps) => {
2424
const { euiTheme } = useEuiTheme();
2525

2626
const { mapField, unmapField } = useStreamEnrichmentEvents();
27+
const isWiredStream = Streams.WiredStream.GetResponse.is(definition);
2728

2829
const hasFields = detectedFields.length > 0;
2930

@@ -49,26 +50,32 @@ export const DetectedFieldsEditor = ({ definition, detectedFields }: DetectedFie
4950

5051
return (
5152
<>
52-
<EuiText
53-
component="p"
54-
color="subdued"
55-
size="xs"
56-
css={css`
57-
margin-bottom: ${euiTheme.size.base};
58-
`}
59-
>
60-
{i18n.translate(
61-
'xpack.streams.streamDetailView.managementTab.enrichment.simulationPlayground.detectedFieldsHeadline',
62-
{ defaultMessage: 'You can review and adjust saved fields further in the Schema Editor.' }
63-
)}
64-
</EuiText>
53+
{isWiredStream && (
54+
<EuiText
55+
component="p"
56+
color="subdued"
57+
size="xs"
58+
css={css`
59+
margin-bottom: ${euiTheme.size.base};
60+
`}
61+
>
62+
{i18n.translate(
63+
'xpack.streams.streamDetailView.managementTab.enrichment.simulationPlayground.detectedFieldsHeadline',
64+
{
65+
defaultMessage:
66+
'You can review and adjust saved fields further in the Schema Editor.',
67+
}
68+
)}
69+
</EuiText>
70+
)}
6571
<SchemaEditor
66-
defaultColumns={['name', 'type', 'format', 'status']}
72+
defaultColumns={isWiredStream ? ['name', 'type', 'format', 'status'] : ['name', 'type']}
6773
fields={detectedFields}
6874
stream={definition.stream}
6975
onFieldUnmap={unmapField}
7076
onFieldUpdate={mapField}
71-
withTableActions
77+
withTableActions={isWiredStream}
78+
withToolbar={isWiredStream}
7279
/>
7380
</>
7481
);

β€Žx-pack/platform/plugins/shared/streams_app/public/components/data_management/stream_detail_enrichment/simulation_playground.tsxβ€Ž

Lines changed: 15 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,6 @@ import {
1515
EuiTab,
1616
EuiTabs,
1717
} from '@elastic/eui';
18-
import { Streams } from '@kbn/streams-schema';
1918
import { ProcessorOutcomePreview } from './processor_outcome_preview';
2019
import {
2120
useSimulatorSelector,
@@ -47,7 +46,6 @@ export const SimulationPlayground = () => {
4746
);
4847

4948
const definition = useStreamsEnrichmentSelector((state) => state.context.definition);
50-
const canViewDetectedFields = Streams.WiredStream.GetResponse.is(definition);
5149

5250
return (
5351
<>
@@ -59,28 +57,26 @@ export const SimulationPlayground = () => {
5957
{ defaultMessage: 'Data preview' }
6058
)}
6159
</EuiTab>
62-
{canViewDetectedFields && (
63-
<EuiTab
64-
isSelected={isViewingDetectedFields}
65-
onClick={viewSimulationDetectedFields}
66-
append={
67-
detectedFields.length > 0 ? (
68-
<EuiNotificationBadge size="m">{detectedFields.length}</EuiNotificationBadge>
69-
) : undefined
70-
}
71-
>
72-
{i18n.translate(
73-
'xpack.streams.streamDetailView.managementTab.enrichment.simulationPlayground.detectedFields',
74-
{ defaultMessage: 'Detected fields' }
75-
)}
76-
</EuiTab>
77-
)}
60+
<EuiTab
61+
isSelected={isViewingDetectedFields}
62+
onClick={viewSimulationDetectedFields}
63+
append={
64+
detectedFields.length > 0 ? (
65+
<EuiNotificationBadge size="m">{detectedFields.length}</EuiNotificationBadge>
66+
) : undefined
67+
}
68+
>
69+
{i18n.translate(
70+
'xpack.streams.streamDetailView.managementTab.enrichment.simulationPlayground.detectedFields',
71+
{ defaultMessage: 'Detected fields' }
72+
)}
73+
</EuiTab>
7874
</EuiTabs>
7975
{isLoading && <EuiProgress size="xs" color="accent" position="absolute" />}
8076
</EuiFlexItem>
8177
<EuiSpacer size="m" />
8278
{isViewingDataPreview && <ProcessorOutcomePreview />}
83-
{isViewingDetectedFields && canViewDetectedFields && (
79+
{isViewingDetectedFields && (
8480
<DetectedFieldsEditor definition={definition} detectedFields={detectedFields} />
8581
)}
8682
</>

β€Žx-pack/platform/plugins/shared/streams_app/public/components/data_management/stream_detail_enrichment/state_management/simulation_state_machine/utils.tsβ€Ž

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -119,6 +119,7 @@ export function getSchemaFieldsFromSimulation(
119119
// Detected field still unmapped
120120
return {
121121
status: 'unmapped',
122+
esType: field.esType,
122123
name: field.name,
123124
parent: streamName,
124125
};

0 commit comments

Comments
Β (0)