ESQL: Fix ExchangeSinkExec output type#142489
Conversation
x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/PlannerUtils.java
Show resolved
Hide resolved
|
Pinging @elastic/es-analytical-engine (Team:Analytics) |
x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ComputeService.java
Outdated
Show resolved
Hide resolved
15d0c04 to
932cf31
Compare
932cf31 to
6ed1ae0
Compare
...enTests/testTopNWithMissingSortField/local_reduce_physical_optimization_data_driver.expected
Outdated
Show resolved
Hide resolved
| ExchangeSinkExec[[emp_no{f}, hire_date{f}, salary{f}],false] | ||
| \_ProjectExec[[_doc{f}, birth_date{r} AS hire_date]] | ||
| \_EvalExec[[null[DATETIME] AS birth_date]] | ||
| ExchangeSinkExec[[_doc{f}, hire_date{r}],false] |
There was a problem hiding this comment.
thought: With and without my test fix, we have an interesting situation here: the exchange doesn't return a field attribute anymore, but a reference attribute (because we intentionally shadowed the hire_date with a null constant from an EVAL that was inserted for the missing field.
This is correct, but means we cannot assume that attribute type remains consistent for an exchange after optimization.
x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/PlannerUtils.java
Show resolved
Hide resolved
x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/PlannerUtils.java
Outdated
Show resolved
Hide resolved
x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/PlannerUtils.java
Show resolved
Hide resolved
...enTests/testTopNWithMissingSortField/local_reduce_physical_optimization_data_driver.expected
Show resolved
Hide resolved
x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ComputeService.java
Outdated
Show resolved
Hide resolved
x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ComputeService.java
Outdated
Show resolved
Hide resolved
x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ComputeService.java
Outdated
Show resolved
Hide resolved
The exchange sinks are already being updated in ComputeService.java. Let's keep the exchange sink update constrained to one place.
The consistency checks in PlannerUtils#reductionPlan are already checking that late materialization planning didn't mess up the data and reduce plans' consistency.
alex-spies
left a comment
There was a problem hiding this comment.
Ok, I simplified the PR a bit. If the tests still pass, we should be good to go.
There was a problem hiding this comment.
Ok, in addition to the FragmentExec "lying" about its output, I can see that TimeSeriesAggregateExec has some peculiarities as well.
I'd like to try and fix those before merging this, so we don't go and artificially declare factually inconsistent plans as consistent. Update: changed my mind; getting the exchanges consistent for aggs is surprisingly complex, given how we generate intermediate attributes. I think it's not worth fixing for now.
Found 1 problem
line -1:-1: Plan [TimeSeriesAggregateExec[[_tsid{m}#54, bucket(@timestamp, 1 hour){r}#55],[MIN($$cpu$MINOVERTIME$min_over_time(cpu){f$}#63,true[BOOLEAN],
PT0S[TIME_DURATION]) AS MINOVERTIME_$1#64, DIMENSIONVALUES(cluster{f}#60,true[BOOLEAN],PT0S[TIME_DURATION]) AS cluster#65, bucket(@timestamp, 1 hour){r}#55],
INTERMEDIATE,[_tsid{m}#54, bucket(@timestamp, 1 hour){r}#55, $$MINOVERTIME_$1$min{r}#66, $$MINOVERTIME_$1$seen{
r}#67, $$cluster$values{r}#68],66,BUCKET(@timestamp{f}#59,PT1H[TIME_DURATION])]] optimized incorrectly due to missing references [$$MINOVERTIME_$1$min{r}#66, $$MINOVERTIME_$1$seen{r}#67, $$cluster$values{r}#68]
java.lang.IllegalStateException: failed for plan
ExchangeSinkExec[[_tsid{m}#54, bucket(@timestamp, 1 hour){r}#55, $$MINOVERTIME_$1$min{r}#66, $$MINOVERTIME_$1$seen{r}#67, $$clu
ster$values{r}#68],true]
\_TimeSeriesAggregateExec[[_tsid{m}#54, bucket(@timestamp, 1 hour){r}#55],[MIN($$cpu$MINOVERTIME$min_over_time(cpu){f$}#63,true[BOOLEAN],
PT0S[TIME_DURATION]) AS MINOVERTIME_$1#64, DIMENSIONVALUES(cluster{f}#60,true[BOOLEAN],PT0S[TIME_DURATION]) AS cluster#65, bucket(@timestamp, 1 hour){r}#55],
INTERMEDIATE,[_tsid{m}#54, bucket(@timestamp, 1 hour){r}#55, $$MINOVERTIME_$1$min{r}#66, $$MINOVERTIME_$1$seen{
r}#67, $$cluster$values{r}#68],66,BUCKET(@timestamp{f}#59,PT1H[TIME_DURATION])]
\_ExchangeSourceExec[[_tsid{m}#54, bucket(@timestamp, 1 hour){r}#55, $$MINOVERTIME_$1$min{r}#56, $$MINOVERTIME_$1$seen{r}#57, $$clu
ster$values{r}#58],true]
x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ComputeService.java
Outdated
Show resolved
Hide resolved
That actually genuinely needs to stay untouched.
This reverts commit f139867.
Only fix late materialization planning. Only check consistency for non-agg plans; aggs remain inconsistent for now.
|
Ok, I ended up reducing scope a little. There was a workaround in place to deal with aggs, I removed it, but excluded aggs from the consistency check for exchanges. The intermediate attributes for aggs and time series aggs make this just too complicated to be worth the effort.
Hope CI likes this now. |
| FragmentExec updatedFragmentExec = fragmentExec.withFragment(updatedFragment); | ||
| // TODO This ignores the possible change in output, see #141654 | ||
| ExchangeSinkExec updatedDataPlan = originalPlan.replaceChild(updatedFragmentExec); | ||
| ExchangeSinkExec updatedDataPlan = originalPlan.replaceChildAndUpdateOutput(updatedFragmentExec); |
...src/test/java/org/elasticsearch/xpack/esql/plugin/LateMaterializationPlannerGoldenTests.java
Outdated
Show resolved
Hide resolved
...src/test/java/org/elasticsearch/xpack/esql/plugin/LateMaterializationPlannerGoldenTests.java
Outdated
Show resolved
Hide resolved
* Fix ExchangeSinkExec output for node-level late materialization. * Add a consistency check for node-level reduction and data node plans. --------- Co-authored-by: Alexander Spies <alexander.spies@elastic.co> Co-authored-by: elasticsearchmachine <infra-root+elasticsearchmachine@elastic.co>
|
@alex-spies Thanks a lot for chaperoning this to production! |
This PR fixes
ExchangeSinkExecclaiming to return the wrong output. This doesn't actually affect production, since the operator outputs whatever it receives anyway, but it's confusing, and looks bad in golden tests as well.Also added a plan verification that only runs during tests to ensure this kind of thing doesn't happen again.
Resolves #141654.