ESQL: Add local parallelism and partition detection for external sources#143154
Merged
costin merged 8 commits intoelastic:mainfrom Feb 26, 2026
Merged
ESQL: Add local parallelism and partition detection for external sources#143154costin merged 8 commits intoelastic:mainfrom
costin merged 8 commits intoelastic:mainfrom
Conversation
Collaborator
|
Pinging @elastic/es-analytical-engine (Team:Analytics) |
Collaborator
|
Hi @costin, I've created a changelog YAML for you. |
6 tasks
External sources previously executed on a single driver. This change adds multi-driver parallelism by introducing an ExternalSliceQueue that distributes splits across drivers, mirroring the LuceneSliceQueue pattern used for Lucene-based sources. - ExternalSliceQueue: thread-safe split queue using AtomicInteger - LocalExecutionPlanner: creates slice queue and sets DATA_PARALLELISM instance count to min(splitCount, taskConcurrency) when splits > 1 - AsyncExternalSourceOperatorFactory: claims splits from queue, reads one file per split with per-split partition value injection - ExternalSourceOperatorFactory: sync SliceQueueSourceOperator variant - AsyncConnectorSourceOperatorFactory: slice queue support scaffolding - SourceOperatorContext: carries optional ExternalSliceQueue to factories Developed using AI-assisted tooling
…tting The planExternalSource method was not passing partitionColumnNames to SourceOperatorContext, preventing VirtualColumnInjector from being created in the slice queue path. Derive partition column names from FileSet.partitionMetadata() and pass them through the builder. - LocalExecutionPlanner: extract partitionColumnNames from FileSet - Spotless formatting applied to merged files Developed using AI-assisted tooling
Use pattern variable lit instead of re-casting in FileSplitProvider IN-expression evaluation. Replace manual null checks with Check.notNull in SourceOperatorContext compact constructor. Developed using AI-assisted tooling
Remove unused 6-arg expandCommaSeparated overload from GlobExpander. In VirtualColumnInjector, replace manual null/empty checks with Check.isTrue and Check.notNull, and demote partitionColumnNames from a field to a constructor-local variable. Developed using AI-assisted tooling
cfe2f85 to
c912db4
Compare
The multi-driver tests used Runnable::run as the async executor, causing a deadlock when the buffer filled up — producer and consumer ran on the same thread. Use a separate CachedThreadPool for async reads. Also wrap thread pool shutdown in try/finally to prevent thread leaks on assertion failures, and remove the CyclicBarrier which added unnecessary timing sensitivity. Developed using AI-assisted tooling
PeteGillinElastic
pushed a commit
to PeteGillinElastic/elasticsearch
that referenced
this pull request
Feb 27, 2026
…ces (elastic#143154) Add local parallelism for external sources. - ExternalSliceQueue: thread-safe split queue using AtomicInteger, analogous to LuceneSliceQueue for Lucene-based sources - LocalExecutionPlanner: creates slice queue and sets DATA_PARALLELISM instance count to min(splitCount, taskConcurrency) when splits > 1; wires partitionColumnNames from FileSet.partitionMetadata() - AsyncExternalSourceOperatorFactory: claims splits from queue, reads one file per split with per-split partition value injection - ExternalSourceOperatorFactory: sync SliceQueueSourceOperator variant - AsyncConnectorSourceOperatorFactory: slice queue support scaffolding - SourceOperatorContext: carries optional ExternalSliceQueue and partitionColumnNames to operator factories Relates elastic#142996 Developed using AI-assisted tooling
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Add local parallelism for external sources and pluggable partition
detection with virtual column injection.
analogous to LuceneSliceQueue for Lucene-based sources
instance count to min(splitCount, taskConcurrency) when splits > 1;
wires partitionColumnNames from FileSet.partitionMetadata()
one file per split with per-split partition value injection
partitionColumnNames to operator factories
Relates #142996
Developed using AI-assisted tooling