ESQL: Add a HashLookupOperator#107894
Conversation
Adds and operator takes a `Block` on construction, builds a `BlockHash`, and uses it to resolve keys to row offsets. Example time! Say we hand construct the operator with the first two columns of this `Page`: | a | b | v | | -:| --:| --:| | 1 | 11 | 21 | | 2 | 12 | 22 | | 2 | 14 | 23 | | 2 | 11 | 24 | If we then fire this the first two columns of this `Page` into it, we'll get the third column: | a | b | ord | | -----:| --:| -----:| | 2 | 14 | 2 | | 1 | 11 | 0 | | 3 | 11 | null | | [1,2] | 11 | [0,3] | This is the first half of the of the `Operator` side of a hash join. The second half is looking up values from those row offsets. That'd mean adding the `v` column like so: | a | b | ord | v | | -----:| --:| -----:| -------:| | 2 | 14 | 2 | 23 | | 1 | 11 | 0 | 21 | | 3 | 11 | null | null | | [1,2] | 11 | [0,3] | [21,24] | And *that* is comparatively simple. Notice that I said this is the *Operator* side of a hash join. There's no planning or distributed execution involved. Yet. And a hash join is something you'd distribute. This `Operator` can run on a data node or a coordinating node. It doesn't care. It just needs an input.
|
Pinging @elastic/es-analytical-engine (Team:Analytics) |
| */ | ||
| public static BlockHash buildPackedValuesBlockHash(List<GroupSpec> groups, BlockFactory blockFactory, int emitBatchSize) { | ||
| return new PackedValuesBlockHash(groups, blockFactory, emitBatchSize); | ||
| } |
There was a problem hiding this comment.
I need to force using a PackedValuesBlockHash for a bit, until I work out how to handle the different ordinals. I didn't want to make the class public, so I plumbed it through here.
| private final int nullTrackingBytes; | ||
| private final BytesRefBuilder bytes = new BytesRefBuilder(); | ||
| private final Group[] groups; | ||
| private final List<GroupSpec> specs; |
There was a problem hiding this comment.
Ok. This was really fun to figure out. If groups lives on the object then the add and lookup methods aren't reentrant on any particular hash. And lookup can't be thread safe. Now, I don't try to use it in a multi-threaded way, but I do call lookup from inside of a callback for add - at least in tests. And they worked! But in a really muddled, sneaky, scary way. This has us building the Group objects on the fly. This is nicer because I can make them Releasable. It's more objects to build on add, but it's safer. Less crazy making.
|
|
||
| AddWork(Page page, GroupingAggregatorFunction.AddInput addInput, int batchSize) { | ||
| super(blockFactory, emitBatchSize, addInput); | ||
| initializeGroupsForPage(page, batchSize); |
There was a problem hiding this comment.
This whole method just moves into the ctor for Group.
| Releasables.close(mapped); | ||
| } | ||
| } | ||
| } |
There was a problem hiding this comment.
This is pretty much the core of the ProjectOperator lifted into a method on Page. I want to project during hash lookups.
There was a problem hiding this comment.
I suppose I could avoid the projection by changing the hash lookup method, but I'm not sure it's worth doing.
| /** | ||
| * Maps a single {@link Page} into zero or more resulting pages. | ||
| */ | ||
| public abstract class AbstractPageMappingToIteratorOperator implements Operator { |
There was a problem hiding this comment.
This implements most of the lookup operator. I made it a separate subclass because:
- It's easier to test that way.
- We're going to reuse it for stuff like loading columns.
- We might be able to reuse it for mv_expand. I'm not sure.
- I think it's easier to read this way - the bits about managing
Iterator<Page>or mergingIterator<Block>into an existing page gets to live here and there's no need to think about where that iterator comes from. I genuinely can't hold the contents ofBlockHash#lookupand this class in my had at the same time. So this thing just works with anyIterator.
| hash.add(new Page(keys), new GroupingAggregatorFunction.AddInput() { | ||
| @Override | ||
| public void add(int positionOffset, IntBlock groupIds) { | ||
| // TODO support multiple rows with the same keys |
There was a problem hiding this comment.
This is a huge TODO, innocently tucked in here. And it dovetails into the block comment above about using different types for the lookup. I don't know how to solve this one at the moment. So I'm saving it fair a rainy day.
| return docIds; | ||
| } | ||
|
|
||
| public void testHashLookup() { |
There was a problem hiding this comment.
I've not plugged the HashLoopupOperator into anything yet so good old OperatorTests will have to do. This has been here since long before I started working on ESQL and it's kind of a junk drawer. But it'll do until we get this wired in.
| * subclass that appends {@code 1} and chunks the incoming {@link Page} | ||
| * at {@code 100} positions. | ||
| */ | ||
| public class IteratorAppendPageTests extends OperatorTestCase { |
There was a problem hiding this comment.
This is quite like the behavior of HashLookupOperator, but easier to reason about because the code fits on the screen....
| * Tests {@link AbstractPageMappingToIteratorOperator} against a test | ||
| * subclass that removes every other page. | ||
| */ | ||
| public class IteratorRemovePageTests extends OperatorTestCase { |
There was a problem hiding this comment.
No other extensions to AbstractPageMappingToIteratorOperator can yield an empty Iterator. But this feels like an important edge case to cover.
| for (int i = 0; i < p.getBlockCount(); i++) { | ||
| resultBlocks.add(p.getBlock(i)); | ||
| } | ||
|
|
There was a problem hiding this comment.
I ended up here because I was debugging something I'd broken, but this was hard to read because it was doing stuff it doesn't really have to do. That resultBlocks list was just built and thrown away. I don't recall what it was for, but we don't need it.
|
|
||
| public class HashLookupOperator extends AbstractPageMappingToIteratorOperator { | ||
| /** | ||
| * Factory for {@link HashLookupOperator}. It's received {@link Block}s |
There was a problem hiding this comment.
Eventually, we need to release these blocks. Let's keep them as they are until we integrate with the planning.
There was a problem hiding this comment.
Yeah! I've been playing around with limiting the size of the blocks and not tracking them. That's "simpler", but it's not good. It might be the right thing to do to get us moving.
This adds support for `LOOKUP`, a command that implements a sort of
inline `ENRICH`, using data that is passed in the request:
```
$ curl -uelastic:password -HContent-Type:application/json -XPOST \
'localhost:9200/_query?error_trace&pretty&format=txt' \
-d'{
"query": "ROW a=1::LONG | LOOKUP t ON a",
"tables": {
"t": {
"a:long": [ 1, 4, 2],
"v1:integer": [ 10, 11, 12],
"v2:keyword": ["cat", "dog", "wow"]
}
},
"version": "2024.04.01"
}'
v1 | v2 | a
---------------+---------------+---------------
10 |cat |1
```
This required these PRs: * #107624 * #107634 * #107701 * #107762 *
#107923 * #107894 * #107982 * #108012 * #108020 * #108169 * #108191 *
#108334 * #108482 * #108696 * #109040 * #109045
Closes #107306
Adds and operator takes a
Blockon construction, builds aBlockHash,and uses it to resolve keys to row offsets. Example time!
Say we hand construct the operator with the first two columns of this
Page:If we then fire this the first two columns of this
Pageinto it, we'llget the third column:
This is the first half of the of the
Operatorside of a hash join.The second half is looking up values from those row offsets. That'd
mean adding the
vcolumn like so:And that is comparatively simple.
Notice that I said this is the Operator side of a hash join. There's
no planning or distributed execution involved. Yet. And a hash join is
something you'd distribute. This
Operatorcan run on a data node or acoordinating node. It doesn't care. It just needs an input.