Skip to content

ESQL: External source parallel execution and distribution#143349

Merged
costin merged 1 commit intoelastic:mainfrom
costin:ws-unified/external-source-parallel-execution
Mar 2, 2026
Merged

ESQL: External source parallel execution and distribution#143349
costin merged 1 commit intoelastic:mainfrom
costin:ws-unified/external-source-parallel-execution

Conversation

@costin
Copy link
Copy Markdown
Member

@costin costin commented Mar 1, 2026

Adds parallel execution, graceful degradation, cost-aware distribution,
and sub-file splitting for external data sources. This enables ESQL to
distribute external source queries across data nodes with resilience
and load balancing.

Arrow Flight connectors can now discover multiple endpoints and read
partitions in parallel. When nodes fail during distributed execution,
partial results are returned instead of failing the entire query.
The adaptive strategy automatically selects weighted round-robin
distribution when split size information is available, balancing load
across nodes proportionally. Large row-based files (CSV, NDJSON) can
be split into byte-range chunks for finer-grained parallelism.

Relates #143327

@costin costin added >enhancement Team:Analytics Meta label for analytical engine team (ESQL/Aggs/Geo) :Analytics/ES|QL AKA ESQL v9.4.0 ES|QL|DS ES|QL datasources labels Mar 1, 2026
@elasticsearchmachine
Copy link
Copy Markdown
Collaborator

Pinging @elastic/es-analytical-engine (Team:Analytics)

@elasticsearchmachine
Copy link
Copy Markdown
Collaborator

Hi @costin, I've created a changelog YAML for you.

Copy link
Copy Markdown
Contributor

@bpintea bpintea left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM, but one note on code that should probably be changed too: should we feed the offset in this call?

Comment on lines +29 to +34
if (offset < 0) {
throw new IllegalArgumentException("offset must be >= 0, got: " + offset);
}
if (length < 0) {
throw new IllegalArgumentException("length must be >= 0, got: " + length);
}
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Check.notNull()


@Override
public InputStream newStream(long position, long rangeLength) throws IOException {
return delegate.newStream(offset + position, rangeLength);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we do here exact math?

Adds graceful degradation, cost-aware distribution, sub-file splitting,
and transport serialization tests for external data sources. Builds on
the Arrow Flight parallel execution merged in elastic#143345.

- DataNodeComputeHandler: graceful degradation with per-node isolation
- WeightedRoundRobinStrategy: LPT-based cost-aware split distribution
- AdaptiveStrategy: auto-select weighted distribution when size available
- ComputeService: register weighted_round_robin strategy
- FileSplitProvider: sub-file splitting for row-based formats
- RangeStorageObject: byte-range view over StorageObject for split reads,
  with Check.notNull() validation and Math.addExact() overflow protection
- ExternalSourceOperatorFactory: RangeStorageObject integration
- FlightSplitCollectionSerializationTests: transport serialization coverage

Developed using AI-assisted tooling
@costin costin force-pushed the ws-unified/external-source-parallel-execution branch from 951be6d to 0f78ca7 Compare March 2, 2026 18:52
@costin
Copy link
Copy Markdown
Member Author

costin commented Mar 2, 2026

Addressed both review comments:

  1. Check.notNull() — Added Check.notNull(delegate, "delegate must not be null") in the RangeStorageObject constructor.
  2. Exact math — Changed offset + position to Math.addExact(offset, position) to guard against overflow.

Also rebased onto latest main and resolved the conflict in ExternalSourceOperatorFactory.java (integrated RangeStorageObject wrapping into the refactored openFileSplit() method).

@costin costin disabled auto-merge March 2, 2026 19:20
@costin costin merged commit fe17bcd into elastic:main Mar 2, 2026
20 of 35 checks passed
@costin costin deleted the ws-unified/external-source-parallel-execution branch March 2, 2026 19:21
szybia added a commit to szybia/elasticsearch that referenced this pull request Mar 2, 2026
…locations

* upstream/main: (94 commits)
  Mute org.elasticsearch.xpack.esql.qa.mixed.EsqlClientYamlIT test {p0=esql/40_tsdb/TS Command grouping on text field} elastic#142544
  Mute org.elasticsearch.index.store.StoreDirectoryMetricsIT testDirectoryMetrics elastic#143419
  Mute org.elasticsearch.xpack.esql.qa.multi_node.GenerativeIT test elastic#143023
  TS_INFO information retrieval command (elastic#142721)
  ESQL: External source parallel execution and distribution (elastic#143349)
  Mute org.elasticsearch.index.mapper.blockloader.FlattenedFieldRootBlockLoaderTests testBlockLoaderForFieldInObject {preference=Params[syntheticSource=false, preference=DOC_VALUES]} elastic#143414
  Mute org.elasticsearch.index.mapper.blockloader.FlattenedFieldRootBlockLoaderTests testBlockLoaderForFieldInObject {preference=Params[syntheticSource=false, preference=NONE]} elastic#143413
  Mute org.elasticsearch.index.mapper.blockloader.FlattenedFieldRootBlockLoaderTests testBlockLoaderForFieldInObject {preference=Params[syntheticSource=false, preference=STORED]} elastic#143412
  Removing ingest random sampling (elastic#143289)
  Mute org.elasticsearch.xpack.esql.qa.single_node.GenerativeIT test elastic#143023
  [Transform] Clean up internal tests (elastic#143246)
  Skip time series field type merge for non-TS agg queries (elastic#143262)
  Enable zero-copy SIMD vector scoring on searchable snapshots (frozen tier) (elastic#141718)
  Mute org.elasticsearch.xpack.search.CrossClusterAsyncSearchIT testCancelViaExpirationOnRemoteResultsWithMinimizeRoundtrips elastic#143407
  Fix MemorySegmentUtilsTests (elastic#143391)
  Unmute testWorkflowsRestrictionAllowsAccess (elastic#143308)
  Cancel async query on expiry (elastic#143016)
  ESQL: Finish migrating error testing (elastic#143322)
  Reduce LuceneOperator.Status memory consumption with large QueryDSL queries (elastic#143175)
  ESQL: Generative testing with full text functions (elastic#142961)
  ...
tballison pushed a commit to tballison/elasticsearch that referenced this pull request Mar 3, 2026
…3349)

Adds parallel execution, graceful degradation, cost-aware distribution,
and sub-file splitting for external data sources. This enables ESQL to
distribute external source queries across data nodes with resilience
and load balancing.

Arrow Flight connectors can now discover multiple endpoints and read
partitions in parallel. When nodes fail during distributed execution,
partial results are returned instead of failing the entire query.
The adaptive strategy automatically selects weighted round-robin
distribution when split size information is available, balancing load
across nodes proportionally. Large row-based files (CSV, NDJSON) can
be split into byte-range chunks for finer-grained parallelism.

Relates elastic#143327
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

:Analytics/ES|QL AKA ESQL >enhancement ES|QL|DS ES|QL datasources Team:Analytics Meta label for analytical engine team (ESQL/Aggs/Geo) v9.4.0

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants