Skip to content

ESQL: Fix ExchangeSinkExec output type#142489

Merged
alex-spies merged 21 commits intoelastic:mainfrom
GalLalouche:fix/exchange_exec_in_node_reduce
Feb 25, 2026
Merged

ESQL: Fix ExchangeSinkExec output type#142489
alex-spies merged 21 commits intoelastic:mainfrom
GalLalouche:fix/exchange_exec_in_node_reduce

Conversation

@GalLalouche
Copy link
Copy Markdown
Contributor

This PR fixes ExchangeSinkExec claiming to return the wrong output. This doesn't actually affect production, since the operator outputs whatever it receives anyway, but it's confusing, and looks bad in golden tests as well.
Also added a plan verification that only runs during tests to ensure this kind of thing doesn't happen again.

Resolves #141654.

@elasticsearchmachine elasticsearchmachine added v9.4.0 needs:triage Requires assignment of a team area label labels Feb 13, 2026
@GalLalouche GalLalouche added >non-issue Team:Analytics Meta label for analytical engine team (ESQL/Aggs/Geo) :Analytics/ES|QL AKA ESQL >refactoring and removed >non-issue labels Feb 13, 2026
@elasticsearchmachine elasticsearchmachine removed the needs:triage Requires assignment of a team area label label Feb 13, 2026
@elasticsearchmachine
Copy link
Copy Markdown
Collaborator

Pinging @elastic/es-analytical-engine (Team:Analytics)

@GalLalouche GalLalouche marked this pull request as draft February 13, 2026 16:18
@GalLalouche GalLalouche marked this pull request as ready for review February 13, 2026 21:24
@GalLalouche GalLalouche force-pushed the fix/exchange_exec_in_node_reduce branch from 15d0c04 to 932cf31 Compare February 13, 2026 21:47
@GalLalouche GalLalouche force-pushed the fix/exchange_exec_in_node_reduce branch from 932cf31 to 6ed1ae0 Compare February 23, 2026 08:56
ExchangeSinkExec[[emp_no{f}, hire_date{f}, salary{f}],false]
\_ProjectExec[[_doc{f}, birth_date{r} AS hire_date]]
\_EvalExec[[null[DATETIME] AS birth_date]]
ExchangeSinkExec[[_doc{f}, hire_date{r}],false]
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thought: With and without my test fix, we have an interesting situation here: the exchange doesn't return a field attribute anymore, but a reference attribute (because we intentionally shadowed the hire_date with a null constant from an EVAL that was inserted for the missing field.

This is correct, but means we cannot assume that attribute type remains consistent for an exchange after optimization.

Copy link
Copy Markdown
Contributor

@alex-spies alex-spies left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, done reviewing now. The productive changes look alright, except for the automatic "fix my exchanges, pls" in ComputeService.java. This shouldn't really be necessary and looks like a leftover.

The exchange sinks are already being updated in ComputeService.java.
Let's keep the exchange sink update constrained to one place.
The consistency checks in PlannerUtils#reductionPlan are already
checking that late materialization planning didn't mess up the data and
reduce plans' consistency.
Copy link
Copy Markdown
Contributor

@alex-spies alex-spies left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, I simplified the PR a bit. If the tests still pass, we should be good to go.

Copy link
Copy Markdown
Contributor

@alex-spies alex-spies left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, in addition to the FragmentExec "lying" about its output, I can see that TimeSeriesAggregateExec has some peculiarities as well.

I'd like to try and fix those before merging this, so we don't go and artificially declare factually inconsistent plans as consistent. Update: changed my mind; getting the exchanges consistent for aggs is surprisingly complex, given how we generate intermediate attributes. I think it's not worth fixing for now.

Found 1 problem
line -1:-1: Plan [TimeSeriesAggregateExec[[_tsid{m}#54, bucket(@timestamp, 1 hour){r}#55],[MIN($$cpu$MINOVERTIME$min_over_time(cpu){f$}#63,true[BOOLEAN],
PT0S[TIME_DURATION]) AS MINOVERTIME_$1#64, DIMENSIONVALUES(cluster{f}#60,true[BOOLEAN],PT0S[TIME_DURATION]) AS cluster#65, bucket(@timestamp, 1 hour){r}#55],
INTERMEDIATE,[_tsid{m}#54, bucket(@timestamp, 1 hour){r}#55, $$MINOVERTIME_$1$min{r}#66, $$MINOVERTIME_$1$seen{
r}#67, $$cluster$values{r}#68],66,BUCKET(@timestamp{f}#59,PT1H[TIME_DURATION])]] optimized incorrectly due to missing references [$$MINOVERTIME_$1$min{r}#66, $$MINOVERTIME_$1$seen{r}#67, $$cluster$values{r}#68]
java.lang.IllegalStateException: failed for plan
ExchangeSinkExec[[_tsid{m}#54, bucket(@timestamp, 1 hour){r}#55, $$MINOVERTIME_$1$min{r}#66, $$MINOVERTIME_$1$seen{r}#67, $$clu
ster$values{r}#68],true]
\_TimeSeriesAggregateExec[[_tsid{m}#54, bucket(@timestamp, 1 hour){r}#55],[MIN($$cpu$MINOVERTIME$min_over_time(cpu){f$}#63,true[BOOLEAN],
PT0S[TIME_DURATION]) AS MINOVERTIME_$1#64, DIMENSIONVALUES(cluster{f}#60,true[BOOLEAN],PT0S[TIME_DURATION]) AS cluster#65, bucket(@timestamp, 1 hour){r}#55],
INTERMEDIATE,[_tsid{m}#54, bucket(@timestamp, 1 hour){r}#55, $$MINOVERTIME_$1$min{r}#66, $$MINOVERTIME_$1$seen{
r}#67, $$cluster$values{r}#68],66,BUCKET(@timestamp{f}#59,PT1H[TIME_DURATION])]
  \_ExchangeSourceExec[[_tsid{m}#54, bucket(@timestamp, 1 hour){r}#55, $$MINOVERTIME_$1$min{r}#56, $$MINOVERTIME_$1$seen{r}#57, $$clu
ster$values{r}#58],true]

That actually genuinely needs to stay untouched.
Only fix late materialization planning.

Only check consistency for non-agg plans; aggs remain inconsistent for
now.
@alex-spies
Copy link
Copy Markdown
Contributor

Ok, I ended up reducing scope a little. There was a workaround in place to deal with aggs, I removed it, but excluded aggs from the consistency check for exchanges. The intermediate attributes for aggs and time series aggs make this just too complicated to be worth the effort.

  • I reduced the exchange sink fix to just the known broken sinks in case of late materialization.
  • I hardened the consistency check to ensure the overall output didn't change and the data driver outputs what the reduce driver requires.
  • I fixed a test that was considering too many fields as missing.

Hope CI likes this now.

FragmentExec updatedFragmentExec = fragmentExec.withFragment(updatedFragment);
// TODO This ignores the possible change in output, see #141654
ExchangeSinkExec updatedDataPlan = originalPlan.replaceChild(updatedFragmentExec);
ExchangeSinkExec updatedDataPlan = originalPlan.replaceChildAndUpdateOutput(updatedFragmentExec);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Main fix here.

@alex-spies alex-spies self-assigned this Feb 25, 2026
@alex-spies alex-spies merged commit 3e34e66 into elastic:main Feb 25, 2026
35 checks passed
smalyshev pushed a commit to smalyshev/elasticsearch that referenced this pull request Feb 25, 2026
* Fix ExchangeSinkExec output for node-level late materialization.
* Add a consistency check for node-level reduction and data node plans.

---------

Co-authored-by: Alexander Spies <alexander.spies@elastic.co>
Co-authored-by: elasticsearchmachine <infra-root+elasticsearchmachine@elastic.co>
@GalLalouche
Copy link
Copy Markdown
Contributor Author

@alex-spies Thanks a lot for chaperoning this to production!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

:Analytics/ES|QL AKA ESQL >refactoring Team:Analytics Meta label for analytical engine team (ESQL/Aggs/Geo) v9.4.0

Projects

None yet

Development

Successfully merging this pull request may close these issues.

ESQL: Incorrect ExchangeExecs in node reduce plan

4 participants