Fixing remote ENRICH by pushing the Enrich inside FragmentExec#114665
Fixing remote ENRICH by pushing the Enrich inside FragmentExec#114665smalyshev merged 9 commits intoelastic:mainfrom
Conversation
2723f7e to
a478b6a
Compare
|
Hi @smalyshev, I've created a changelog YAML for you. |
8303f87 to
57cf748
Compare
|
Pinging @elastic/es-analytical-engine (Team:Analytics) |
💔 Backport failedThe backport operation could not be completed due to the following error: You can use sqren/backport to manually backport by running |
💚 All backports created successfully
Questions ?Please refer to the Backport tool documentation |
…ic#114665) * Fixing remote ENRICH by pushing the Enrich inside FragmentExec * Improve handling of more complex cases such as several enriches (cherry picked from commit e789039)
…ic#114665) * Fixing remote ENRICH by pushing the Enrich inside FragmentExec * Improve handling of more complex cases such as several enriches
costin
left a comment
There was a problem hiding this comment.
Due to a merge conflict I had the change to look at the code added in this PR and have some serious concerns the Mapper code.
I have a hard time following what the code does but the transformation looks scary to me.
| var child = map(enrich.child()); | ||
| AtomicBoolean hasFragment = new AtomicBoolean(false); | ||
|
|
||
| var childTransformed = child.transformUp((f) -> { | ||
| // Once we reached FragmentExec, we stuff our Enrich under it | ||
| if (f instanceof FragmentExec) { | ||
| hasFragment.set(true); | ||
| return new FragmentExec(p); | ||
| } | ||
| if (f instanceof EnrichExec enrichExec) { | ||
| // It can only be ANY because COORDINATOR would have errored out earlier, and REMOTE should be under FragmentExec | ||
| assert enrichExec.mode() == Enrich.Mode.ANY : "enrich must be in ANY mode here"; | ||
| return enrichExec.child(); | ||
| } | ||
| if (f instanceof UnaryExec unaryExec) { | ||
| if (f instanceof LimitExec || f instanceof ExchangeExec || f instanceof OrderExec || f instanceof TopNExec) { | ||
| return f; | ||
| } else { | ||
| return unaryExec.child(); | ||
| } | ||
| } | ||
| // Currently, it's either UnaryExec or LeafExec. Leaf will either resolve to FragmentExec or we'll ignore it. | ||
| return f; | ||
| }); | ||
|
|
||
| if (hasFragment.get()) { | ||
| return childTransformed; | ||
| } | ||
| } |
There was a problem hiding this comment.
This section of code needs to be integrated with the one below (line 112-115, about Enrich and coordinator mode).
| // 5. So we should be keeping: LimitExec, ExchangeExec, OrderExec, TopNExec (actually OrderExec probably can't happen anyway). | ||
|
|
||
| var child = map(enrich.child()); | ||
| AtomicBoolean hasFragment = new AtomicBoolean(false); |
There was a problem hiding this comment.
Use Holder hasFragment = new Holder<>(false);
| if (localMode == false && p instanceof Enrich enrich && enrich.mode() == Enrich.Mode.REMOTE) { | ||
| // When we have remote enrich, we want to put it under FragmentExec, so it would be executed remotely. | ||
| // We're only going to do it on the coordinator node. | ||
| // The way we're going to do it is as follows: | ||
| // 1. Locate FragmentExec in the tree. If we have no FragmentExec, we won't do anything. | ||
| // 2. Put this Enrich under it, removing everything that was below it previously. | ||
| // 3. Above FragmentExec, we should deal with pipeline breakers, since pipeline ops already are supposed to go under | ||
| // FragmentExec. | ||
| // 4. Aggregates can't appear here since the plan should have errored out if we have aggregate inside remote Enrich. | ||
| // 5. So we should be keeping: LimitExec, ExchangeExec, OrderExec, TopNExec (actually OrderExec probably can't happen anyway). | ||
|
|
||
| var child = map(enrich.child()); |
There was a problem hiding this comment.
this is similar to the code below under UnaryPlan, var child = map().. // line 150.
There was a problem hiding this comment.
Yes, this part so far is, but the later part diverges.
| // Unary Plan | ||
| // | ||
| if (localMode == false && p instanceof Enrich enrich && enrich.mode() == Enrich.Mode.REMOTE) { | ||
| // When we have remote enrich, we want to put it under FragmentExec, so it would be executed remotely. |
There was a problem hiding this comment.
remote typically implies remote cluster. I think you mean data node (or in ESQL terminology local as in local planning).
There was a problem hiding this comment.
Yes, here it is meant that remote enrich would go to the remote cluster. It may also do it effectively on the local cluster if the enrich policy & indexes are there, but the important part I think is that it'd also go to the remote.
| // We're only going to do it on the coordinator node. | ||
| // The way we're going to do it is as follows: | ||
| // 1. Locate FragmentExec in the tree. If we have no FragmentExec, we won't do anything. | ||
| // 2. Put this Enrich under it, removing everything that was below it previously. |
There was a problem hiding this comment.
removing everything that was below it previously
This most certainly sounds like a no-go. Do you mean insert the node under FragmentExec and the node below maybe?
There was a problem hiding this comment.
Everything that is under the Enrich will still be under it when it's inserted into the Fragment, as a logical plan (which will later be converted to physical when processing the fragment as I understand).
| // The way we're going to do it is as follows: | ||
| // 1. Locate FragmentExec in the tree. If we have no FragmentExec, we won't do anything. | ||
| // 2. Put this Enrich under it, removing everything that was below it previously. | ||
| // 3. Above FragmentExec, we should deal with pipeline breakers, since pipeline ops already are supposed to go under |
There was a problem hiding this comment.
The pipeline breaker influence the exchange data transfer, if you add another node it will break the data-node / coordinator contract.
There was a problem hiding this comment.
Not sure what you mean here, could you explain?
| // 3. Above FragmentExec, we should deal with pipeline breakers, since pipeline ops already are supposed to go under | ||
| // FragmentExec. | ||
| // 4. Aggregates can't appear here since the plan should have errored out if we have aggregate inside remote Enrich. | ||
| // 5. So we should be keeping: LimitExec, ExchangeExec, OrderExec, TopNExec (actually OrderExec probably can't happen anyway). |
There was a problem hiding this comment.
Sounds like you want Enrich to be a pipeline breaker itself.
The comments above indicate a number of incorrect assumptions that are not accurate and doesn't explain the problem it tries to solve.
There was a problem hiding this comment.
Making Enrich pipeline breaker won't help us too much, by itself. The problem is that due to how mapping works right now, it has (had before the patch) no ability to place Enrich inside the fragment (and thus execute it remotely) if any pipeline breakers are present (and LIMIT at least is always present unless assimilated into TopN).
| // Once we reached FragmentExec, we stuff our Enrich under it | ||
| if (f instanceof FragmentExec) { | ||
| hasFragment.set(true); | ||
| return new FragmentExec(p); |
There was a problem hiding this comment.
I'm not sure why this works!
This overrides the subplan on the datanode with the enrich plan up the tree, that sits on the coordinator essentially overriding the pipeline boundary regardless of what's underneath.
This needs to be changed.
There was a problem hiding this comment.
It basically does the same thing it does later to non-pipeline-breakers but with some complications.
| if (f instanceof UnaryExec unaryExec) { | ||
| if (f instanceof LimitExec || f instanceof ExchangeExec || f instanceof OrderExec || f instanceof TopNExec) { | ||
| return f; | ||
| } else { | ||
| return unaryExec.child(); | ||
| } |
There was a problem hiding this comment.
I'm, not sure what this is suppose to do - check if it's a pipeline breaker otherwise skip it?
So a FilterExec gets removed?
There was a problem hiding this comment.
Yes, FilterExec will be removed from this part - the filter will be inside Enrich plan under FragmentExec, which when converted to a physical plan would produce the FilterExec which will be executed.
…ic#114665) * Fixing remote ENRICH by pushing the Enrich inside FragmentExec * Improve handling of more complex cases such as several enriches
Make
Mapperto place remote enrich clauses into FragmentExec and reorder the tree so all work for this enrich also happens inside FragmentExec. Some operations require special handling, such as limits and sorts, because they happen both inside and outside Fragment.See also: #105095