[ESQL] Enable distributed pipeline breakers for external sources via FragmentExec#143696
Conversation
Use FlightServer.close() for robust gRPC shutdown and add a brief sleep to let Netty event loop callbacks drain before the test ends. Closes elastic#143636
…FragmentExec Wrap ExternalRelation in FragmentExec (like EsRelation) so that pipeline breakers (Aggregate, Limit, TopN) above external sources are naturally distributed to data nodes via ExchangeExec. On data nodes, localPlan() expands the FragmentExec through LocalMapper into ExternalSourceExec, enabling local optimizations such as filter pushdown via FilterPushdownRegistry. Key changes: - Make ExternalRelation serializable for transport inside FragmentExec - Mapper wraps ExternalRelation in FragmentExec; LocalMapper expands it - Split discovery peeks inside FragmentExec for ExternalRelation - Coordinator-only path injects splits after localPlan expansion - AdaptiveStrategy recognizes TopN as a pipeline breaker
|
Pinging @elastic/es-analytical-engine (Team:Analytics) |
|
Hi @costin, I've created a changelog YAML for you. |
0e20d76 to
fd734ce
Compare
ReplaceFieldWithConstantOrNull uses SearchStats.exists() to determine which fields to retain. On the coordinator-only path for external sources, SearchStats is empty (no ES search contexts), so exists() returns false for all fields, causing the optimizer to replace all external source fields with null. Retain fields from ExternalRelation output in the shouldBeRetained predicate, following the same pattern used for lookup index fields. This preserves all local optimizations (constant folding, filter pushdown, column pruning, etc.) while preventing incorrect field pruning. Also collapse ExchangeExec and reset TopNExec InputOrdering when external source plans fall back to coordinator-only execution.
e98e7d2 to
0300fc7
Compare
|
Important Review skippedAuto reviews are limited based on label configuration. 🏷️ Required labels (at least one) (2)
Please check the settings in the CodeRabbit UI or the ⚙️ Run configurationConfiguration used: Path: .coderabbit.yml Review profile: CHILL Plan: Pro Run ID: You can disable this status message by setting the Use the checkbox below for a quick retry:
✨ Finishing Touches🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
…locations * upstream/main: (153 commits) ES|QL: Update docs for TOP_SNIPPETS and DECAY (elastic#143739) Correctly include endpoint id in log msg in AuthorizationPoller (elastic#143743) Bar searching or sorting on _seq_no when disabled (elastic#143600) Generalize `testClientCancellation` test (elastic#143586) JSON_EXTRACT: zero-copy byte slicing for object, array, and number extraction (elastic#143702) Track recycler pages in circuit breaker (elastic#143738) [ESQL] Enable distributed pipeline breakers for external sources via FragmentExec (elastic#143696) Adding 'mode' and 'codec' fields to ES monitoring template (elastic#143673) [ESQL] Columnar I/O and vectorized block conversion for external sources (elastic#143703) Fix flaky MMR diversification YAML tests (elastic#143706) ES|QL codegen: check builder arguments for vector support (elastic#143724) Add Views Security Model (elastic#141050) ESQL: Prevent pushdown of unmapped fields in filters and sorts (elastic#143460) Don't run seq_no pruning tests in release CI (elastic#143725) ESQL: Support intra-row field references in ROW command (elastic#140217) ES|QL: Remove implicit limit in FORK branches in CSV tests (elastic#143601) IndexRoutingTests with and without synthetic id (elastic#143566) Synthetic id upgrade test in serverless (elastic#142471) Disable "Review skipped" comments for PRs without specified labels (elastic#143728) Cleanup ES|QL T-Digest code duplication, add memory accounting (elastic#143662) ...
…FragmentExec (elastic#143696) Wrap ExternalRelation in FragmentExec (like EsRelation) so that pipeline breakers (Aggregate, Limit, TopN) above external sources are naturally distributed to data nodes via ExchangeExec. Previously, ExternalRelation was mapped directly to ExternalSourceExec on the coordinator, bypassing the FragmentExec-based distribution that EsRelation uses. This meant all three data-node optimization stages were skipped: LocalLogicalPlanOptimizer, LocalMapper, and LocalPhysicalPlanOptimizer (including filter pushdown via FilterPushdownRegistry). The filter pushdown is especially critical since PushFiltersToSource creates an opaque pushedFilter on ExternalSourceExec that is not serializable and must be created on the same JVM where the operator runs. With this change, on data nodes localPlan() expands the FragmentExec through LocalMapper into ExternalSourceExec, enabling local optimizations. The coordinator-only path correctly discovers and injects splits after localPlan expansion. AdaptiveStrategy now also recognizes TopN as a pipeline breaker for distribution decisions. ReplaceFieldWithConstantOrNull is taught to retain fields from ExternalRelation (like it already does for lookup index fields) since SearchStats is empty for external sources. ExchangeExec nodes wrapping external FragmentExec are collapsed and TopNExec InputOrdering is reset when falling back to coordinator-only execution. Developed using AI-assisted tooling
Wrap ExternalRelation in FragmentExec (like EsRelation) so that pipeline breakers (Aggregate, Limit, TopN) above external sources are naturally distributed to data nodes via ExchangeExec.
Previously, ExternalRelation was mapped directly to ExternalSourceExec on the coordinator, bypassing the FragmentExec-based distribution that EsRelation uses. This meant all three data-node optimization stages were skipped: LocalLogicalPlanOptimizer, LocalMapper, and LocalPhysicalPlanOptimizer (including filter pushdown via FilterPushdownRegistry). The filter pushdown is especially critical since PushFiltersToSource creates an opaque pushedFilter on ExternalSourceExec that is not serializable and must be created on the same JVM where the operator runs.
With this change, on data nodes localPlan() expands the FragmentExec through LocalMapper into ExternalSourceExec, enabling local optimizations. The coordinator-only path correctly discovers and injects splits after localPlan expansion. AdaptiveStrategy now also recognizes TopN as a pipeline breaker for distribution decisions.
ReplaceFieldWithConstantOrNull is taught to retain fields from ExternalRelation (like it already does for lookup index fields) since SearchStats is empty for external sources. ExchangeExec nodes wrapping external FragmentExec are collapsed and TopNExec InputOrdering is reset when falling back to coordinator-only execution.