ESQL: Distributed execution for external data sources
External data sources (EXTERNAL command) currently execute entirely on the coordinator — single driver, single thread, no parallelism. For large datasets, especially with aggregations, this creates a throughput bottleneck and concentrated memory pressure on the coordinator.
This meta issue tracks the work to add proper distribution and parallelism to external source execution. The approach introduces a split-based execution model where external sources are partitioned into independent units of work (ExternalSplit), discovered through a pluggable SplitProvider, and optionally distributed across data nodes using the existing exchange infrastructure.
Key characteristics:
- Pluggable partition detection: Hive-style
key=value paths auto-detected by default, bare directory layouts (Kinesis Firehose, CloudTrail, etc.) supported via {name} path templates in WITH configuration. Partition-aware glob rewriting reduces file listing scope before any data is read.
- Virtual partition columns: partition values from file paths exposed as queryable columns (appended at tail, same position as metadata columns). On name conflict with data columns, path-derived values take precedence.
- Three-level filter model: partition pruning (at split discovery), data filter pushdown (per-node, translated to native format), engine filter (remainder in plan)
- Pluggable distribution strategy with basic query analysis (distribute when aggregations are present and multiple splits exist; keep on coordinator for simple queries where distribution overhead is not justified)
- ES|QL
Expression objects (already NamedWriteable) serve as the canonical filter representation across nodes — no need to serialize connector-native filter types
Steps
ESQL: Distributed execution for external data sources
External data sources (
EXTERNALcommand) currently execute entirely on the coordinator — single driver, single thread, no parallelism. For large datasets, especially with aggregations, this creates a throughput bottleneck and concentrated memory pressure on the coordinator.This meta issue tracks the work to add proper distribution and parallelism to external source execution. The approach introduces a split-based execution model where external sources are partitioned into independent units of work (
ExternalSplit), discovered through a pluggableSplitProvider, and optionally distributed across data nodes using the existing exchange infrastructure.Key characteristics:
key=valuepaths auto-detected by default, bare directory layouts (Kinesis Firehose, CloudTrail, etc.) supported via{name}path templates in WITH configuration. Partition-aware glob rewriting reduces file listing scope before any data is read.Expressionobjects (alreadyNamedWriteable) serve as the canonical filter representation across nodes — no need to serialize connector-native filter typesSteps
ExternalSplit(NamedWriteable),SplitProvider,FileSplit; Hive-style partition column detection from file paths during glob expansion; partition filter hint extraction from parsed plan; glob pattern rewriting for=/>=/INpredicates (ESQL: Add split SPI, partition detection, and filter hint extraction #143005)ExternalSourceExeccarries splits; split discovery wired intoComputeServiceafter physical planning; L1 partition pruning with full expression evaluation against path-derived partition values (ESQL: Add split discovery and distribution for external sources #143114)PartitionDetectorinterface with Hive, template, and auto-detect implementations;TemplatePartitionDetectorfor bare directory layouts via{name}path templates; partition columns added to schema as virtual columns;VirtualColumnInjectorproducesConstantBlocks per partition value per output Page at the source operator level (ESQL: Add pluggable partition detection and virtual columns #143120)ExternalSliceQueue;LocalExecutionPlannercreates multiple drivers per external source; each driver processes a different split; operator factories reworked for per-split operation (ESQL: Add local parallelism and partition detection for external sources #143154)ExternalDistributionStrategy(pluggable);AdaptiveStrategywith basic query analysis;NodeEligibilityStrategyhook;MapperinsertsExchangeExecfor distributable external sources; pragmaexternal_distribution(ESQL: Add distribution strategy for external sources #143194)DataNodeRequestcarriesExternalSplitassignments;DataNodeComputeHandlerexternal source branch;FilterPushdownRegistrywired on data nodes for per-node L2 filter translation; end-to-end distributed execution through existing exchange infrastructure (ESQL: Add data node execution for external sources #143209)