Skip to content

ESQL: Add local parallelism and partition detection for external sources#143154

Merged
costin merged 8 commits intoelastic:mainfrom
costin:esql/ds-distributed/stage-3
Feb 26, 2026
Merged

ESQL: Add local parallelism and partition detection for external sources#143154
costin merged 8 commits intoelastic:mainfrom
costin:esql/ds-distributed/stage-3

Conversation

@costin
Copy link
Copy Markdown
Member

@costin costin commented Feb 26, 2026

Add local parallelism for external sources and pluggable partition
detection with virtual column injection.

  • ExternalSliceQueue: thread-safe split queue using AtomicInteger,
    analogous to LuceneSliceQueue for Lucene-based sources
  • LocalExecutionPlanner: creates slice queue and sets DATA_PARALLELISM
    instance count to min(splitCount, taskConcurrency) when splits > 1;
    wires partitionColumnNames from FileSet.partitionMetadata()
  • AsyncExternalSourceOperatorFactory: claims splits from queue, reads
    one file per split with per-split partition value injection
  • ExternalSourceOperatorFactory: sync SliceQueueSourceOperator variant
  • AsyncConnectorSourceOperatorFactory: slice queue support scaffolding
  • SourceOperatorContext: carries optional ExternalSliceQueue and
    partitionColumnNames to operator factories

Relates #142996

Developed using AI-assisted tooling

@costin costin added >enhancement Team:Analytics Meta label for analytical engine team (ESQL/Aggs/Geo) :Analytics/ES|QL AKA ESQL v9.4.0 labels Feb 26, 2026
@costin costin requested a review from bpintea February 26, 2026 15:12
@elasticsearchmachine
Copy link
Copy Markdown
Collaborator

Pinging @elastic/es-analytical-engine (Team:Analytics)

@elasticsearchmachine
Copy link
Copy Markdown
Collaborator

Hi @costin, I've created a changelog YAML for you.

External sources previously executed on a single driver. This change
adds multi-driver parallelism by introducing an ExternalSliceQueue
that distributes splits across drivers, mirroring the LuceneSliceQueue
pattern used for Lucene-based sources.

- ExternalSliceQueue: thread-safe split queue using AtomicInteger
- LocalExecutionPlanner: creates slice queue and sets DATA_PARALLELISM
  instance count to min(splitCount, taskConcurrency) when splits > 1
- AsyncExternalSourceOperatorFactory: claims splits from queue, reads
  one file per split with per-split partition value injection
- ExternalSourceOperatorFactory: sync SliceQueueSourceOperator variant
- AsyncConnectorSourceOperatorFactory: slice queue support scaffolding
- SourceOperatorContext: carries optional ExternalSliceQueue to factories

Developed using AI-assisted tooling
…tting

The planExternalSource method was not passing partitionColumnNames to
SourceOperatorContext, preventing VirtualColumnInjector from being
created in the slice queue path. Derive partition column names from
FileSet.partitionMetadata() and pass them through the builder.

- LocalExecutionPlanner: extract partitionColumnNames from FileSet
- Spotless formatting applied to merged files

Developed using AI-assisted tooling
Use pattern variable lit instead of re-casting in FileSplitProvider
IN-expression evaluation. Replace manual null checks with Check.notNull
in SourceOperatorContext compact constructor.

Developed using AI-assisted tooling
Remove unused 6-arg expandCommaSeparated overload from GlobExpander.
In VirtualColumnInjector, replace manual null/empty checks with
Check.isTrue and Check.notNull, and demote partitionColumnNames
from a field to a constructor-local variable.

Developed using AI-assisted tooling
@costin costin force-pushed the esql/ds-distributed/stage-3 branch from cfe2f85 to c912db4 Compare February 26, 2026 16:17
@costin costin enabled auto-merge (squash) February 26, 2026 16:19
Copy link
Copy Markdown
Contributor

@bpintea bpintea left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🤖 says 🆗

The multi-driver tests used Runnable::run as the async executor,
causing a deadlock when the buffer filled up — producer and consumer
ran on the same thread. Use a separate CachedThreadPool for async
reads. Also wrap thread pool shutdown in try/finally to prevent
thread leaks on assertion failures, and remove the CyclicBarrier
which added unnecessary timing sensitivity.

Developed using AI-assisted tooling
@costin costin merged commit 0ccb3ba into elastic:main Feb 26, 2026
35 checks passed
@costin costin deleted the esql/ds-distributed/stage-3 branch February 26, 2026 22:13
PeteGillinElastic pushed a commit to PeteGillinElastic/elasticsearch that referenced this pull request Feb 27, 2026
…ces (elastic#143154)

Add local parallelism for external sources.

- ExternalSliceQueue: thread-safe split queue using AtomicInteger,
  analogous to LuceneSliceQueue for Lucene-based sources
- LocalExecutionPlanner: creates slice queue and sets DATA_PARALLELISM
  instance count to min(splitCount, taskConcurrency) when splits > 1;
  wires partitionColumnNames from FileSet.partitionMetadata()
- AsyncExternalSourceOperatorFactory: claims splits from queue, reads
  one file per split with per-split partition value injection
- ExternalSourceOperatorFactory: sync SliceQueueSourceOperator variant
- AsyncConnectorSourceOperatorFactory: slice queue support scaffolding
- SourceOperatorContext: carries optional ExternalSliceQueue and
  partitionColumnNames to operator factories

Relates elastic#142996

Developed using AI-assisted tooling
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

:Analytics/ES|QL AKA ESQL >enhancement Team:Analytics Meta label for analytical engine team (ESQL/Aggs/Geo) v9.4.0

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants