Skip to content

[ESQL] Enable distributed pipeline breakers for external sources via FragmentExec#143696

Merged
costin merged 9 commits intoelastic:mainfrom
costin:esql/distributed-pipeline-breakers-fragmentexec
Mar 6, 2026
Merged

[ESQL] Enable distributed pipeline breakers for external sources via FragmentExec#143696
costin merged 9 commits intoelastic:mainfrom
costin:esql/distributed-pipeline-breakers-fragmentexec

Conversation

@costin
Copy link
Copy Markdown
Member

@costin costin commented Mar 5, 2026

Wrap ExternalRelation in FragmentExec (like EsRelation) so that pipeline breakers (Aggregate, Limit, TopN) above external sources are naturally distributed to data nodes via ExchangeExec.

Previously, ExternalRelation was mapped directly to ExternalSourceExec on the coordinator, bypassing the FragmentExec-based distribution that EsRelation uses. This meant all three data-node optimization stages were skipped: LocalLogicalPlanOptimizer, LocalMapper, and LocalPhysicalPlanOptimizer (including filter pushdown via FilterPushdownRegistry). The filter pushdown is especially critical since PushFiltersToSource creates an opaque pushedFilter on ExternalSourceExec that is not serializable and must be created on the same JVM where the operator runs.

With this change, on data nodes localPlan() expands the FragmentExec through LocalMapper into ExternalSourceExec, enabling local optimizations. The coordinator-only path correctly discovers and injects splits after localPlan expansion. AdaptiveStrategy now also recognizes TopN as a pipeline breaker for distribution decisions.

ReplaceFieldWithConstantOrNull is taught to retain fields from ExternalRelation (like it already does for lookup index fields) since SearchStats is empty for external sources. ExchangeExec nodes wrapping external FragmentExec are collapsed and TopNExec InputOrdering is reset when falling back to coordinator-only execution.

costin added 3 commits March 5, 2026 12:58
Use FlightServer.close() for robust gRPC shutdown and add a brief
sleep to let Netty event loop callbacks drain before the test ends.

Closes elastic#143636
…FragmentExec

Wrap ExternalRelation in FragmentExec (like EsRelation) so that pipeline
breakers (Aggregate, Limit, TopN) above external sources are naturally
distributed to data nodes via ExchangeExec. On data nodes, localPlan()
expands the FragmentExec through LocalMapper into ExternalSourceExec,
enabling local optimizations such as filter pushdown via
FilterPushdownRegistry.

Key changes:
- Make ExternalRelation serializable for transport inside FragmentExec
- Mapper wraps ExternalRelation in FragmentExec; LocalMapper expands it
- Split discovery peeks inside FragmentExec for ExternalRelation
- Coordinator-only path injects splits after localPlan expansion
- AdaptiveStrategy recognizes TopN as a pipeline breaker
@costin costin requested a review from bpintea March 5, 2026 16:35
@elasticsearchmachine elasticsearchmachine added the Team:Analytics Meta label for analytical engine team (ESQL/Aggs/Geo) label Mar 5, 2026
@elasticsearchmachine
Copy link
Copy Markdown
Collaborator

Pinging @elastic/es-analytical-engine (Team:Analytics)

@elasticsearchmachine
Copy link
Copy Markdown
Collaborator

Hi @costin, I've created a changelog YAML for you.

@costin costin force-pushed the esql/distributed-pipeline-breakers-fragmentexec branch from 0e20d76 to fd734ce Compare March 5, 2026 21:31
ReplaceFieldWithConstantOrNull uses SearchStats.exists() to determine
which fields to retain. On the coordinator-only path for external
sources, SearchStats is empty (no ES search contexts), so exists()
returns false for all fields, causing the optimizer to replace all
external source fields with null.

Retain fields from ExternalRelation output in the shouldBeRetained
predicate, following the same pattern used for lookup index fields.
This preserves all local optimizations (constant folding, filter
pushdown, column pruning, etc.) while preventing incorrect field
pruning.

Also collapse ExchangeExec and reset TopNExec InputOrdering when
external source plans fall back to coordinator-only execution.
@costin costin force-pushed the esql/distributed-pipeline-breakers-fragmentexec branch from e98e7d2 to 0300fc7 Compare March 5, 2026 21:40
@elastic elastic deleted a comment from coderabbitai bot Mar 6, 2026
@coderabbitai
Copy link
Copy Markdown
Contributor

coderabbitai bot commented Mar 6, 2026

Important

Review skipped

Auto reviews are limited based on label configuration.

🏷️ Required labels (at least one) (2)
  • Team:Delivery
  • Team:Search - Inference

Please check the settings in the CodeRabbit UI or the .coderabbit.yaml file in this repository. To trigger a single review, invoke the @coderabbitai review command.

⚙️ Run configuration

Configuration used: Path: .coderabbit.yml

Review profile: CHILL

Plan: Pro

Run ID: 1b79ce2b-eac1-4f5b-9a6b-0330dcbfdb60

You can disable this status message by setting the reviews.review_status to false in the CodeRabbit configuration file.

Use the checkbox below for a quick retry:

  • 🔍 Trigger review
✨ Finishing Touches
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Post copyable unit tests in a comment

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

Copy link
Copy Markdown
Contributor

@bpintea bpintea left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Lgtm

@costin costin merged commit 1e67573 into elastic:main Mar 6, 2026
36 checks passed
@costin costin deleted the esql/distributed-pipeline-breakers-fragmentexec branch March 6, 2026 14:37
szybia added a commit to szybia/elasticsearch that referenced this pull request Mar 6, 2026
…locations

* upstream/main: (153 commits)
  ES|QL: Update docs for TOP_SNIPPETS and DECAY (elastic#143739)
  Correctly include endpoint id in log msg in AuthorizationPoller (elastic#143743)
  Bar searching or sorting on _seq_no when disabled (elastic#143600)
  Generalize `testClientCancellation` test (elastic#143586)
  JSON_EXTRACT: zero-copy byte slicing for object, array, and number extraction (elastic#143702)
  Track recycler pages in circuit breaker (elastic#143738)
  [ESQL] Enable distributed pipeline breakers for external sources via FragmentExec (elastic#143696)
  Adding 'mode' and 'codec' fields to ES monitoring template (elastic#143673)
  [ESQL] Columnar I/O and vectorized block conversion for external sources (elastic#143703)
  Fix flaky MMR diversification YAML tests (elastic#143706)
  ES|QL codegen: check builder arguments for vector support (elastic#143724)
  Add Views Security Model (elastic#141050)
  ESQL: Prevent pushdown of unmapped fields in filters and sorts (elastic#143460)
  Don't run seq_no pruning tests in release CI (elastic#143725)
  ESQL: Support intra-row field references in ROW command (elastic#140217)
  ES|QL: Remove implicit limit in FORK branches in CSV tests (elastic#143601)
  IndexRoutingTests with and without synthetic id (elastic#143566)
  Synthetic id upgrade test in serverless (elastic#142471)
  Disable "Review skipped" comments for PRs without specified labels (elastic#143728)
  Cleanup ES|QL T-Digest code duplication, add memory accounting (elastic#143662)
  ...
sidosera pushed a commit to sidosera/elasticsearch that referenced this pull request Mar 6, 2026
…FragmentExec (elastic#143696)

Wrap ExternalRelation in FragmentExec (like EsRelation) so that pipeline breakers (Aggregate, Limit, TopN) above external sources are naturally distributed to data nodes via ExchangeExec.

Previously, ExternalRelation was mapped directly to ExternalSourceExec on the coordinator, bypassing the FragmentExec-based distribution that EsRelation uses. This meant all three data-node optimization stages were skipped: LocalLogicalPlanOptimizer, LocalMapper, and LocalPhysicalPlanOptimizer (including filter pushdown via FilterPushdownRegistry). The filter pushdown is especially critical since PushFiltersToSource creates an opaque pushedFilter on ExternalSourceExec that is not serializable and must be created on the same JVM where the operator runs.

With this change, on data nodes localPlan() expands the FragmentExec through LocalMapper into ExternalSourceExec, enabling local optimizations. The coordinator-only path correctly discovers and injects splits after localPlan expansion. AdaptiveStrategy now also recognizes TopN as a pipeline breaker for distribution decisions.

ReplaceFieldWithConstantOrNull is taught to retain fields from ExternalRelation (like it already does for lookup index fields) since SearchStats is empty for external sources. ExchangeExec nodes wrapping external FragmentExec are collapsed and TopNExec InputOrdering is reset when falling back to coordinator-only execution.

Developed using AI-assisted tooling
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

:Analytics/ES|QL AKA ESQL >enhancement Team:Analytics Meta label for analytical engine team (ESQL/Aggs/Geo) v9.4.0

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants