ESQL: Serialize Blocks in plan#108334
Conversation
This adds support for serializing `Block`s as part of the plan. Unlike serializing blocks in the normal stream, we attempt to prevent sending duplicate blocks. This is'll make it easier to serialize blocks in the plan without worrying about duplicates.
|
Pinging @elastic/es-analytical-engine (Team:Analytics) |
dnhatn
left a comment
There was a problem hiding this comment.
I left some comments, but feel free to merge this. Thanks Nik!
|
|
||
| @Override | ||
| public int readArraySize() throws IOException { | ||
| return super.readArraySize(); |
There was a problem hiding this comment.
Gotta make it public! I'll leave a comment.
There was a problem hiding this comment.
I've reworked it so I don't need to make this public.
x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/io/stream/PlanStreamInput.java
Outdated
Show resolved
Hide resolved
x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/io/stream/PlanStreamOutput.java
Outdated
Show resolved
Hide resolved
x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/io/stream/PlanStreamOutput.java
Outdated
Show resolved
Hide resolved
x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/io/stream/PlanStreamOutput.java
Outdated
Show resolved
Hide resolved
| */ | ||
| public final class PlanStreamOutput extends StreamOutput { | ||
|
|
||
| private final Map<Block, BytesReference> cachedBlocks = new IdentityHashMap<>(); |
There was a problem hiding this comment.
I have some concerns about using Block as the key since we'll need to read all values in hashCode and equals. However, if you're okay with that, I'm fine too.
There was a problem hiding this comment.
It's an IdentityHashMap - just the object itself.
There was a problem hiding this comment.
I'll leave a comment about how that's important. It's be expensive to use the Block as a regular key.
| } | ||
| Block[] blocks = new Block[count]; | ||
| for (int i = 0; i < blocks.length; i++) { | ||
| blocks[i] = in.readBlock(); |
There was a problem hiding this comment.
I know we don't track memory here, but releasing blocks in case of exceptions would make our life easier in the future.
|
|
||
| private static final Supplier<LongFunction<NameId>> DEFAULT_NAME_ID_FUNC = NameIdMapper::new; | ||
|
|
||
| private final Map<Integer, Block> cachedBlocks = new HashMap<>(); |
There was a problem hiding this comment.
Let's decouple this from the stream, similar to NameIdMapper - this way we could support both streams that don't do any caching and followed by those who do.
How common is it that we'll have duplicate blocks? Comparing their content will take CPU and I wonder if being opportunistic about reusing them (when we know it's the same data) isn't an alternative for at least some cases.
There was a problem hiding this comment.
In the plan I think we're fairly likely to have duplicates - at least, that's how I want to build the serialization. Let the streaming deal with the duplicates.
alex-spies
left a comment
There was a problem hiding this comment.
LGTM, just minor comments.
x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/io/stream/PlanStreamInput.java
Outdated
Show resolved
Hide resolved
...ql/src/main/java/org/elasticsearch/xpack/esql/plan/logical/local/ImmediateLocalSupplier.java
Outdated
Show resolved
Hide resolved
| static LocalSupplier of(Block[] blocks) { | ||
| return new ImmediateLocalSupplier(blocks); | ||
| } |
x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/io/stream/PlanStreamOutput.java
Outdated
Show resolved
Hide resolved
x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/io/stream/PlanStreamInput.java
Outdated
Show resolved
Hide resolved
x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/io/stream/PlanStreamOutput.java
Show resolved
Hide resolved
Co-authored-by: Alexander Spies <alexander.spies@elastic.co>
…into esql_serialize_blocks_in_plan
Test passed..... |
This adds support for `LOOKUP`, a command that implements a sort of
inline `ENRICH`, using data that is passed in the request:
```
$ curl -uelastic:password -HContent-Type:application/json -XPOST \
'localhost:9200/_query?error_trace&pretty&format=txt' \
-d'{
"query": "ROW a=1::LONG | LOOKUP t ON a",
"tables": {
"t": {
"a:long": [ 1, 4, 2],
"v1:integer": [ 10, 11, 12],
"v2:keyword": ["cat", "dog", "wow"]
}
},
"version": "2024.04.01"
}'
v1 | v2 | a
---------------+---------------+---------------
10 |cat |1
```
This required these PRs: * #107624 * #107634 * #107701 * #107762 *
#107923 * #107894 * #107982 * #108012 * #108020 * #108169 * #108191 *
#108334 * #108482 * #108696 * #109040 * #109045
Closes #107306
This adds support for serializing
Blocks as part of the plan. Unlike serializing blocks in the normal stream, we attempt to prevent sending duplicate blocks. This is'll make it easier to serialize blocks in the plan without worrying about duplicates.