[ES|QL] Add schema reconciliation for multi-file external sources#145220
[ES|QL] Add schema reconciliation for multi-file external sources#145220costin merged 5 commits intoelastic:mainfrom
Conversation
|
Pinging @elastic/es-analytical-engine (Team:Analytics) |
|
Hi @costin, I've created a changelog YAML for you. |
|
Caution Review failedAn error occurred during the review process. Please try again later. ✨ Finishing Touches🧪 Generate unit tests (beta)
Comment |
Adds planning-time schema reconciliation for external file sources that span multiple files with potentially different schemas. Supports STRICT (exact match) and UNION_BY_NAME (merge by column name with safe type widening) strategies via the schema_resolution WITH clause parameter. Core changes: - SchemaReconciliation: reconciliation algorithms with ColumnMapping that handles both planning-time use and wire serialization (CastType enum ordinals, no strings on the wire) - SchemaAdaptingIterator: execution-time adapter that reorders columns, inserts NULL blocks for missing columns, and casts blocks for type widening (INTEGER→LONG, INTEGER→DOUBLE, DATETIME→DATE_NANOS) - ExternalSourceResolver: parallel metadata reading with bounded concurrency (semaphore + MAX_PARALLEL_METADATA_READS) - FileSplit carries nullable ColumnMapping; FileSplitProvider shares the same instance across all splits from the same file via dedup cache Developed with AI-assisted tooling
Wire SchemaAdaptingIterator into the async execution path and fix partition column dimensionality in both sync/async paths by using attributes.subList(0, columnCount()). Demote per-query schema reconciliation timing log to debug. Add ColumnMapping round-trip serialization tests and unit tests for SchemaAdaptingIterator covering all cast types, null fill, reorder, empty page, failure cleanup, and close delegation.
- Sync factory: mapping-aware column projection in openFileSplit - SchemaAdaptingIterator: constructor invariant for schema/mapping size - Async factory: wire adaptSchema into startMultiFileRead path - Remove unused logger field and trivial FileSplit adaptSchema overload
c8489aa to
2f1bcad
Compare
🔍 Preview links for changed docs⏳ Building and deploying preview... View progress This comment will be updated with preview links when the build is complete. |
ℹ️ Important: Docs version tagging👋 Thanks for updating the docs! Just a friendly reminder that our docs are now cumulative. This means all 9.x versions are documented on the same page and published off of the main branch, instead of creating separate pages for each minor version. We use applies_to tags to mark version-specific features and changes. Expand for a quick overviewWhen to use applies_to tags:✅ At the page level to indicate which products/deployments the content applies to (mandatory) What NOT to do:❌ Don't remove or replace information that applies to an older version 🤔 Need help?
|
bpintea
left a comment
There was a problem hiding this comment.
LG, except the serialization.
| if (ordinal < 0 || ordinal >= VALUES.length) { | ||
| throw new IllegalArgumentException("Unknown cast ordinal: " + ordinal); | ||
| } | ||
| return VALUES[ordinal]; |
There was a problem hiding this comment.
This will probably cause bwc issues when we update this enum. We usually serialize the strings to prevent that.
190084c to
e5ef28e
Compare
Switch from hand-rolled ordinal byte encoding to the standard ES writeEnum/readEnum pattern. Add assertEnumSerialization test to pin ordinal-to-value mapping per ES convention.
e5ef28e to
b69c1d8
Compare
|
@costin, just noticed, the PR description needs updating too. |
…rics
* upstream/main: (21 commits)
Mute org.elasticsearch.xpack.esql.qa.mixed.MixedClusterEsqlSpecIT test {csv-spec:external-basic.topSnippetsFunction} elastic#145353
Mute org.elasticsearch.xpack.esql.qa.mixed.MixedClusterEsqlSpecIT test {csv-spec:external-basic.scoreFunction} elastic#145352
[DiskBBQ] Fix bug in NeighborQueue#popRawAndAddRaw (elastic#145324)
Fix dense_vector default index options when using BFLOAT16 (elastic#145202)
Use checked exceptions in entitlement constructor rules (elastic#145234)
ESQL: DS: datasource file plugins should not return TEXT types (elastic#145334)
Plumb DLM error store through to DlmFrozenTransition classes (elastic#145243)
Make Settings.Builder.remove() fluent (elastic#145294)
Add FLS tests for METRICS_INFO and TS_INFO (elastic#145211)
Fix flaky SecurityFeatureResetTests (elastic#145063)
[DOCS] Fix conflict markers in ESQL processing command list (elastic#145338)
Skip certain metric assertions on Windows (elastic#144933)
[ES|QL] Add schema reconciliation for multi-file external sources (elastic#145220)
Simplify DiskBBQ dynamic visit ratio to linear (elastic#142784)
ESQL: Disallow unmapped_fields=load with partial non-KEYWORD (elastic#144109)
[Transform] Track Linked Projects (elastic#144399)
Fix bulk scoring to process last batch instead of falling through to scalar tail (elastic#145316)
Clean up TickerScheduleEngineTests (elastic#145303)
[CI] ShardBulkInferenceActionFilterIT testRestart - Ensuring that secrets-inference index is available after full restart and unmuting test (elastic#145317)
Add CRUD doc to the DistributedArchitectureGuide (elastic#144710)
...
…astic#145220) * [ES|QL] Add schema reconciliation for multi-file external sources Adds planning-time schema reconciliation for external file sources that span multiple files with potentially different schemas. Supports STRICT (exact match) and UNION_BY_NAME (merge by column name with safe type widening) strategies via the schema_resolution WITH clause parameter. Core changes: - SchemaReconciliation: reconciliation algorithms with ColumnMapping that handles both planning-time use and wire serialization (CastType enum ordinals, no strings on the wire) - SchemaAdaptingIterator: execution-time adapter that reorders columns, inserts NULL blocks for missing columns, and casts blocks for type widening (INTEGER→LONG, INTEGER→DOUBLE, DATETIME→DATE_NANOS) - ExternalSourceResolver: parallel metadata reading with bounded concurrency (semaphore + MAX_PARALLEL_METADATA_READS) - FileSplit carries nullable ColumnMapping; FileSplitProvider shares the same instance across all splits from the same file via dedup cache Developed with AI-assisted tooling * Fix schema reconciliation gaps for multi-file sources Wire SchemaAdaptingIterator into the async execution path and fix partition column dimensionality in both sync/async paths by using attributes.subList(0, columnCount()). Demote per-query schema reconciliation timing log to debug. Add ColumnMapping round-trip serialization tests and unit tests for SchemaAdaptingIterator covering all cast types, null fill, reorder, empty page, failure cleanup, and close delegation. * Harden schema reconciliation gaps and clean up async factory - Sync factory: mapping-aware column projection in openFileSplit - SchemaAdaptingIterator: constructor invariant for schema/mapping size - Async factory: wire adaptSchema into startMultiFileRead path - Remove unused logger field and trivial FileSplit adaptSchema overload * Update docs/changelog/145220.yaml * Use writeEnum/readEnum for CastType serialization Switch from hand-rolled ordinal byte encoding to the standard ES writeEnum/readEnum pattern. Add assertEnumSerialization test to pin ordinal-to-value mapping per ES convention.
…astic#145220) * [ES|QL] Add schema reconciliation for multi-file external sources Adds planning-time schema reconciliation for external file sources that span multiple files with potentially different schemas. Supports STRICT (exact match) and UNION_BY_NAME (merge by column name with safe type widening) strategies via the schema_resolution WITH clause parameter. Core changes: - SchemaReconciliation: reconciliation algorithms with ColumnMapping that handles both planning-time use and wire serialization (CastType enum ordinals, no strings on the wire) - SchemaAdaptingIterator: execution-time adapter that reorders columns, inserts NULL blocks for missing columns, and casts blocks for type widening (INTEGER→LONG, INTEGER→DOUBLE, DATETIME→DATE_NANOS) - ExternalSourceResolver: parallel metadata reading with bounded concurrency (semaphore + MAX_PARALLEL_METADATA_READS) - FileSplit carries nullable ColumnMapping; FileSplitProvider shares the same instance across all splits from the same file via dedup cache Developed with AI-assisted tooling * Fix schema reconciliation gaps for multi-file sources Wire SchemaAdaptingIterator into the async execution path and fix partition column dimensionality in both sync/async paths by using attributes.subList(0, columnCount()). Demote per-query schema reconciliation timing log to debug. Add ColumnMapping round-trip serialization tests and unit tests for SchemaAdaptingIterator covering all cast types, null fill, reorder, empty page, failure cleanup, and close delegation. * Harden schema reconciliation gaps and clean up async factory - Sync factory: mapping-aware column projection in openFileSplit - SchemaAdaptingIterator: constructor invariant for schema/mapping size - Async factory: wire adaptSchema into startMultiFileRead path - Remove unused logger field and trivial FileSplit adaptSchema overload * Update docs/changelog/145220.yaml * Use writeEnum/readEnum for CastType serialization Switch from hand-rolled ordinal byte encoding to the standard ES writeEnum/readEnum pattern. Add assertEnumSerialization test to pin ordinal-to-value mapping per ES convention.
Adds schema reconciliation for external file sources that span multiple files
with potentially different schemas. Users select a strategy via the WITH clause:
Problem
FIRST_FILE_WINS(the current default) silently ignores schema differencesacross files. This causes wrong column values when files have different column
ordering, or runtime errors when types don't match — with no clear message
pointing to the root cause.
Strategies
time with a descriptive error naming the offending file and column, and a
hint to use
union_by_name.columns are NULL-filled. Only lossless type widening is allowed (INTEGER→LONG,
INTEGER→DOUBLE, DATETIME→DATE_NANOS). LONG→DOUBLE is rejected (lossy above
2^53), matching DuckDB and Spark consensus.
The default remains
first_file_winsfor backward compatibility.Design rationale
Informed by a survey of DuckDB, Spark, ClickHouse, and Cribl — see
SCHEMA_RECONCILIATION_DESIGN.md. Both strategies scan all file metadataeagerly at planning time (footer reads only, parallelized with bounded
concurrency). This also collects per-file statistics for aggregate pushdown
Phase 2 at zero extra cost.
Type widening uses a custom
schemaWiden()— notEsqlDataTypeConverter.commonType()which allows lossy LONG→DOUBLE. Column matching is case-sensitive, consistent
with ESQL's field resolution semantics.
Developed with AI-assisted tooling