ES|QL: Support STATS after FORK#128745
Conversation
|
Hi @ioanatia, I've created a changelog YAML for you. |
|
Pinging @elastic/es-analytical-engine (Team:Analytics) |
|
Pinging @elastic/es-search-relevance (Team:Search Relevance) |
|
I am putting back the non-issue label, because we don't want a release note for this particular PR. |
| if (aggregatorMode == AggregatorMode.INITIAL && aggregateExec.child() instanceof ExchangeSourceExec) { | ||
| if (aggregatorMode == AggregatorMode.INITIAL | ||
| && aggregateExec.child() instanceof ExchangeSourceExec exchangeSourceExec | ||
| && exchangeSourceExec.isIntermediateAgg()) { |
There was a problem hiding this comment.
This change looks fine to me, but I'd like to get another pair of eyes from @alex-spies to double check if the aggregatorMode is set properly, as this was introduced by Alex originally.
There was a problem hiding this comment.
the reason why we added this check is because a query like:
FROM test
| FORK
( WHERE content:"fox" ) // sub plan 1
( WHERE content:"dog" ) // sub plan 2
| SORT _fork
| KEEP _fork, id, content
will now be split into multiple sub plans using ExchangeSourceExec and ExchangeSinkExec:
and so we need to explicitly check with what type of ExchangeSourceExec we are dealing with.
There was a problem hiding this comment.
This looks correct to me, too. I think this corresponds to the code in PlannerUtils.reductionPlan that creates intermediate aggregations in the node-level reduction steps; as far as I can see, in this case it's always the case that exchangeSourceExec.isIntermediateAgg is set to true because this only applies when the exchange is between 2 aggregation steps.
@ioanatia , sorry for this piece of code - it has history and isn't very clear. For the time being, could you maybe leave a comment like c.f. ComputeService.reductionPlan, which sets the aggregator mode to INITIAL.
There was a problem hiding this comment.
cc @dnhatn who worked on node-level reduction and related stuff more recently.
@dnhatn , it looks like this code for node-level reduction is a bit leaky, as it requires details about how reduction plans get constructed (aggregator mode INITIAL while being between aggs). I think this stems from a time where ESQL's planning code only had the INITIAL and FINAL agg modes, and only the compute engine already had the INTERMEDIATE one. Nhat, what's your take on us refactoring PlannerUtils.reductionPlan here and setting the agg mode to INTERMEDIATE immediately? Then this weird code block in the AbstractPhysicalOperationProviders that was edited here shouldn't even be necessary, right?
Edit: draft PR with the suggested simplification: #128980; let's see what CI says about this.
There was a problem hiding this comment.
thank you for the context @alex-spies
I can def wait for the refactor PR to get in, but I see that there are some CI failures that seem related in the draft PR.
So I don't think we should block this PR until the refactor gets in, because it sounds like the change here looks correct to both you and @fang-xing-esql .
Do you agree?
There was a problem hiding this comment.
The node-level reduction should create an INTERMEDIATE aggregation. I'm not sure why we made it INITIAL. I have approved your proposal.
afoucret
left a comment
There was a problem hiding this comment.
I do not have all the context but these change are making sense to me.
|
@alex-spies should we merge this one #128980 first? we merged latest main and it looks like CI is happy now, except for the Serverless checks failure which does not seem related. then my PR should hopefully be just about adding extra tests to catch regressions |
related: #121950
It was reported that using STATS immediately after FORK results in an error.
Example:
ends up with
The problem has to do with how we set the
aggregatorModeinAbstractPhysicalOperationProviders.javagroupingPhysicalOperation. We just assume that if theAggregateExecfollows aExchangeSourceExec, we should set theaggregatorModetoINTERMEDIARYbecause we are on the data node reduce step.With the introduction of FORK and #126389 this is no longer always true, so we explicitly check if the
ExchangeSourceExecis indeed between aggregations.