ESQL: add Arrow dataframes output format#109873
Conversation
|
Documentation preview: |
|
Pinging @elastic/es-analytical-engine (Team:Analytics) |
|
This has been on my review list for a few days. I'm just on vacation a bunch of days. It's still on the list. sorry for the delay! |
nik9000
left a comment
There was a problem hiding this comment.
I left a few comments, but I think the most important merge blockers are:
- Some kind of integration test. I guess it'd be something that uses the arrow ipc library directly in java to read and write some things.
- Only allowing this in SNAPSHOT builders. It's super paranoid behavior, but I think it's warranted.
What should we do about multivalued results, do you think? By the time we go to serialize these we're past the point of adding warning headers. At least for now, we could silently drop them. Or maybe log a warning in the server log.
| throw e; | ||
| } finally { | ||
| if (output != null) { | ||
| // assert false : "failed to write arrow chunk"; |
There was a problem hiding this comment.
I think we should either drop the assert entirely and the if statement - close methods skip nulls because of code like this. So much code like this. Or we should uncomment it. I guess the question is - is it always a bug if we throw an exception while serializing? I guess the question is, does recycler come from netty? If so I think it can throw through no fault of our own.
Also! Are we sure we want to keep the catch Exception and log bit? I might have added that in the prototype, but I think it's not the normal way to do this - normally we'd just let it bubble up.
There was a problem hiding this comment.
Good catch. I cleaned it up in 3bb04d3 only kept the Releasables.closeExpectNoException(output). Or should we remove it as well?
x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlResponseListener.java
Show resolved
Hide resolved
x-pack/plugin/esql/arrow/src/main/java/org/elasticsearch/xpack/esql/arrow/ValueConversions.java
Show resolved
Hide resolved
x-pack/plugin/esql/arrow/src/main/java/org/elasticsearch/xpack/esql/arrow/ArrowResponse.java
Show resolved
Hide resolved
x-pack/plugin/esql/arrow/src/main/java/org/elasticsearch/xpack/esql/arrow/BlockConverter.java
Show resolved
Hide resolved
x-pack/plugin/esql/arrow/src/main/java/org/elasticsearch/xpack/esql/arrow/BlockConverter.java
Outdated
Show resolved
Hide resolved
| // FIXME: are these parameters ok here? | ||
| // FIXME: should the resulting block be closed after use? | ||
| new NoopCircuitBreaker("esql-arrow"), | ||
| BigArrays.NON_RECYCLING_INSTANCE |
There was a problem hiding this comment.
This is fine for a prototype but it's a blocker to stop switch. There are three choices I think that are ok:
- Push the converter into the superclass and do it optionally, right as you dump into the wire. This is probably the best way because it involves the least copying and memory management. It might be the slowest because the converter will go megamorphic pretty quick. But it's shoveling a fair number of bits around so it's probably fine. Probably.
- Convert the whole block up front in a tight loop with memory management.
- Option 1 but with the generics reified out by hand - java doesn't do it for you so you need to do it with your own fingers. Or, in our case, we'd do it with stringtemplate. That's almost certainly the choice that's fastest at runtime. But I don't know that it matters for this.
Either way, this is fine for now.
There was a problem hiding this comment.
I don't know much about memory management in ES. Is there some explanation somewhere or some reference code I could get inspiration from?
There was a problem hiding this comment.
It's more memory management int he compute engine than the rest of ES. The short version is that you can use BlockFactory to make a builder like:
try (IntBlockBuilder builder = block.blockFactory().newIntBlockBuilder(block.getPositionCount())) {
for... { builder.appendInt(i); }
try (IntBlock block = builder.build()) {
super.render(block);
}
}
That'll account for the new block using the same circuit breakers as the original block. It could be fairly large.
Something like RoundIntEvaluator does something similar.
}
There was a problem hiding this comment.
Added 54fa11f to reuse the source block's factory and inherits its circuit breaker.
| if (bytes.length != 0) { | ||
| bytes = valueConverter.apply(bytes, scratch); | ||
| } | ||
| builder.appendBytesRef(bytes); |
There was a problem hiding this comment.
You are probably better off copying the bytes to a single BytesRefBuilder scratch and then passing that to the converter to modify in place. That way it doesn't have to allocate the result.
One interesting thing I've realized is that in most cases it's safe to just directly modify the returned bytes but not in all cases. Which is kind of lame.
@swallez and I talked about this some offline and I'm convinced it's fine to allow this in non-snapshot builds. It's an extra layer of paranoia that in this case isn't buying us anything. It has really useful side effects when we are making inter-node communication changes, but we aren't doing that here. |
|
@nik9000 I addressed your comments. Regarding the main points:
In
Since it's really peripheral (it's an output format users have to choose explicitly and involves no communication between cluster nodes), it seems overly paranoid to me 😉
I did not look at it currently and considered them as single-valued blocks. Looking at the implementation this is actually wrong, since we will flatten all values up to the size of the page. I'll see if I can add them quickly. Otherwise we can either:
I'd rather go for 2/ which is less surprising for users (i.e. no "but where is my field?") |
|
In c7cd12d I added a check to fail hard with a meaningful message when we encounter a multivalued block (and associated test). |
👍 The reason I'm keen on involving the network isn't really the network, it's to get a view of how this all looks from the user's perspective with netty involved. For example, throwing your exception there should make sure to come back as an error, but if we've already started the response it won't quite work right. Mostly I'm interested in the error cases and maybe like 1 success case. As a follow-up I can try and adapt the csv-spec tests to use array which gives us great coverage without having to maintain much more stuff. |
|
|
||
| // TODO could we "just" get the memory of the array and dump it? | ||
| int count = block.getPositionCount(); | ||
| for (int i = 0; i < count; i++) { |
| @Override | ||
| public void convert(Block b, List<ArrowBuf> bufs, List<BufWriter> bufWriters) { | ||
| BytesRefBlock block = (BytesRefBlock) b; | ||
| BytesRefBlock transformed = transformValues(block); |
There was a problem hiding this comment.
I think you'll need to close the returned transformed built block. The usual way is try (BytesRefBloc transformed = transformValues(block) {super.convert...}. That'll close it and decrement the memory tracking stuff.
There was a problem hiding this comment.
I'm going to to try and write a test that fails without this.
Bah that's pretty much what I was going to add. I just didn't see it. sorry. But, yeah, I do think it needs one that hits the types that need the transform. I think a neat followup is to try and write a test that'll run the CSV tests fetching over arrow. But that's kind of finicky to get in, especially without multivalued fields. That'll get all the types and everything. But for now I think doing a test with some of the transformed types is good. |
|
Added integration tests for values and ip addresses (both are transformed on serialization) in 00aa730 |
nik9000
left a comment
There was a problem hiding this comment.
I have discovered a small 🐛. I'll push some changes to your test in a bit and maybe the bug fix. Checking.
| builder.appendNull(); | ||
| } else { | ||
| BytesRef bytes = block.getBytesRef(i, scratch); | ||
| if (bytes.length != 0) { |
There was a problem hiding this comment.
Now that I think about it, I think this should apply to empty BytesRef values. Let the converter decide what to do.
|
@swallez I figured out my issue. I was worried about that conversion code not cleaning up the memory tracking. And it wasn't. But your integration test didn't catch it because it was missing an explicit check for memory tracking. Now, what that isn't part of every test, I'm not sure. But it isn't. I had to opt it in. And when I did it started failing. So I added the tracking, just a Now I feel much better. I'm going to give this one more read and then approve it. |
|
Hi @swallez, I've created a changelog YAML for you. |
|
@swallez I knew the change that broke the build for you so I pushed a fix. |
Initial support for Apache Arrow's streaming format as a response for ES|QL. It triggers based on the Accept header or the format request parameter. Arrow has implementations in every mainstream language and is a backend of the Python Pandas library, which is extremely popular among data scientists and data analysts. Arrow's streaming format has also become the de facto standard for dataframe interchange. It is an efficient binary format that allows zero-cost deserialization by adding data access wrappers on top of memory buffers received from the network. This PR builds on the experiment made by @nik9000 in PR #104877 Features/limitations: - all ES|QL data types are supported - multi-valued fields are not supported - fields of type _source are output as JSON text in a varchar array. In a future iteration we may want to offer the choice of the more efficient CBOR and SMILE formats. Technical details: Arrow comes with its own memory management to handle vectors with direct memory, reference counting, etc. We don't want to use this as it conflicts with Elasticsearch's own memory management. We therefore use the Arrow library only for the metadata objects describing the dataframe schema and the structure of the streaming format. The Arrow vector data is produced directly from ES|QL blocks. --------- Co-authored-by: Nik Everett <nik9000@gmail.com>
Initial support for Apache Arrow's streaming format as a response for ES|QL. It triggers based on the Accept header or the format request parameter. Arrow has implementations in every mainstream language and is a backend of the Python Pandas library, which is extremely popular among data scientists and data analysts. Arrow's streaming format has also become the de facto standard for dataframe interchange. It is an efficient binary format that allows zero-cost deserialization by adding data access wrappers on top of memory buffers received from the network. This PR builds on the experiment made by @nik9000 in PR #104877 Features/limitations: - all ES|QL data types are supported - multi-valued fields are not supported - fields of type _source are output as JSON text in a varchar array. In a future iteration we may want to offer the choice of the more efficient CBOR and SMILE formats. Technical details: Arrow comes with its own memory management to handle vectors with direct memory, reference counting, etc. We don't want to use this as it conflicts with Elasticsearch's own memory management. We therefore use the Arrow library only for the metadata objects describing the dataframe schema and the structure of the streaming format. The Arrow vector data is produced directly from ES|QL blocks. --------- Co-authored-by: Nik Everett <nik9000@gmail.com>


Adds support for Apache Arrow's streaming format as a response for ES|QL. It triggers based on the
Acceptheader or theformatrequest parameter.Arrow has implementations in every mainstream language and is a backend of the Python Pandas library, which is extremely popular among data scientists and data analysts. Arrow's streaming format has also become the de facto standard for dataframe interchange. It is an efficient binary format that allows zero-cost deserialization by adding data access wrappers on top of memory buffers received from the network.
This PR builds on the experiment made by @nik9000 in PR #104877
Features/limitations
_sourceare output as JSON text in avarchararray. In a future iteration we may want to offer the choice of the more efficient CBOR and SMILE formats.Technical details
Arrow comes with its own memory management to handle vectors with direct memory, reference counting, etc. We don't want to use this as it conflicts with Elasticsearch's own memory management.
We therefore use the Arrow library only for the metadata objects describing the dataframe schema and the structure of the streaming format. The Arrow vector data is produced directly from ES|QL blocks.
Future work
Although it's already quite featured, there are some improvements to be done, beyond the limitations outlined above:
nullvalues. We should aggregate small pages into larger Arrow record batches to remove the overhead of batch headers.Tests
The unit tests cover all data types and random combinations of fields and sparse/dense/empty vectors. In addition to that, some manual tests have been done in Python, Rust and Java using the standard Arrow libraries.
Below is a Python example taken from PR #104877. The last 3 lines are where the Arrow format is used: read the Arrow stream, wrap it in a Pandas dataframe, and plot the data.