Skip to content

Apache arrow support for ES|QL#104877

Closed
nik9000 wants to merge 6 commits intoelastic:mainfrom
nik9000:arrow
Closed

Apache arrow support for ES|QL#104877
nik9000 wants to merge 6 commits intoelastic:mainfrom
nik9000:arrow

Conversation

@nik9000
Copy link
Copy Markdown
Member

@nik9000 nik9000 commented Jan 29, 2024

This adds support for Apache Arrow's streaming
format as a response from ES|QL. It triggers based on the Accept
header or a format request parameter.

This is neat because Arrow is an efficient format to read - as in the
processor has to do very little to take the data from the wire and put
it on the screen. Because Arrow is efficient folks are excited about
it and making it available in lots of tools. ES|QL could piggy back
on that. For example, to use ES|QL in a Jupyter Notebook with this PR
you'd do this:

import base64
import urllib3
import pyarrow as pa
import matplotlib.pyplot as plt
import pandas as pd
plt.close("all")
timeout = urllib3.Timeout(connect=2.0, read=300.0)
http = urllib3.PoolManager(timeout=timeout)

def basic(login, password):
  encoded = base64.b64encode((login + ':' + password).encode()).decode()
  return "Basic %s" % encoded

resp = http.request(
  "POST",
  "http://elastic:password@localhost:9200/_query?format=arrow",
  headers = {
    "authorization": basic("elastic", "password")
  },
  json = {
    "query": """
      FROM nyc_taxis 
    | WHERE trip_distance > 0 AND trip_distance < 100 AND fare_amount > 1
    | STATS min=MIN(fare_amount), avg=AVG(fare_amount), max=MAX(fare_amount) BY pickup_datetime=DATE_TRUNC(1 DAY, pickup_datetime)
    | EVAL LOG10(min), LOG10(max), LOG10(avg)
    | DROP min, max, avg
    | SORT pickup_datetime
    | LIMIT 100000"""
  })

if resp.status != 200:
  print("failed: %s" % resp.data)
else:
    with pa.ipc.open_stream(resp.data) as reader:
        df = reader.read_pandas()
        df.plot.line(x="pickup_datetime")

That's about a third ESQL and half ceremony. The last three lines are
super dense:

  1. Convert from a stream of bytes into an arrow reader
  2. Convert the reader into Pandaspandas
  3. Plot the line using matplotlibmatplotlib

That's a little import antigravityantigravity. But that's great when
it works! And Kibana should be able to integrate with it. Probably not as
simply as Pandas but we can get it!

Arrow is kind of a perfect output format for ES|QL in that it's a
batched columnar format. The output from this PR looks like:

<SCHEMA>
<BATCH 0>
<BATCH 1>
<BATCH 2>
<BATCH 3>
...
<0xFFFFFFFF 0x00000000>

Each batch looks like:

<HEADER>
<VALIDITY 0>
<DATA 0>
<VALIDITY 1>
<DATA 1>
<VALIDITY 2>
<DATA 2>
<VALIDITY 3>
<LENGTH 3>
<DATA 3>

Dense data like int and long and double always have a VALIDITY and
DATA buffer. Variable length data like keywords have a VALIDITY, LENGTH,
and DATA buffer. This format is pretty close to ES|QL's own in memory
representation. It's close. And, the best part is that ES|QL's internal
response is also batched. We have batches, and, if we're very lucky, we
could integrate this with Pauseable Chunked Responses (#104851). It
should integrate perfectly. Should.

// return false;
// }
// NOCOMMIT disabled security manager
return true;
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can't commit this. But I spent half a day messing with the security manager and gave up. We should be able to fix this.

};
}

static ChunkedRestResponseBody fromMany(ChunkedRestResponseBody first, Iterator<? extends ChunkedRestResponseBody> rest) {
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This'll want javadoc. But I think it's generally useful.

@Override
public ReleasableBytesReference encodeChunk(int sizeHint, Recycler<BytesRef> recycler) throws IOException {
try {
return current.encodeChunk(sizeHint, recycler);
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I haven't double checked this, but I'm worried that this'll send a chunk over the wire no matter how big the reference is. If it's less than the sizeHint we should probably give the next one a chance.

* Initialize the Arrow shim. Arrow does some interesting reflection stuff on
* initialization. We can avoid it if we
*/
public static void init() {
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Arrow has some "interesting" code in the initialization. In an effort to be magic it looks around the in classpath and calls setAccessible on something. That something is public already, but the security manager still fails.

We can avoid the whole classpath scanning and reflection magic with some contained magic of our own - this stuff. But we need security manager privileges. So I moved this to it's own tiny jar.

Lastly, this replaces all the things arrow does with the Unsafe with a shim that does nothing. That's fine for how we're using it.

@nik9000
Copy link
Copy Markdown
Member Author

nik9000 commented Jul 2, 2024

#109873 finished this one.

@nik9000 nik9000 closed this Jul 2, 2024
swallez added a commit that referenced this pull request Jul 3, 2024
Initial support for Apache Arrow's streaming format as a response for ES|QL. It triggers based on the Accept header or the format request parameter.

Arrow has implementations in every mainstream language and is a backend of the Python Pandas library, which is extremely popular among data scientists and data analysts. Arrow's streaming format has also become the de facto standard for dataframe interchange. It is an efficient binary format that allows zero-cost deserialization by adding data access wrappers on top of memory buffers received from the network.

This PR builds on the experiment made by @nik9000 in PR #104877

Features/limitations:
- all ES|QL data types are supported
- multi-valued fields are not supported
- fields of type _source are output as JSON text in a varchar array. In a future iteration we may want to offer the choice of the more efficient CBOR and SMILE formats.

Technical details:

Arrow comes with its own memory management to handle vectors with direct memory, reference counting, etc. We don't want to use this as it conflicts with Elasticsearch's own memory management.

We therefore use the Arrow library only for the metadata objects describing the dataframe schema and the structure of the streaming format. The Arrow vector data is produced directly from ES|QL blocks.

---------

Co-authored-by: Nik Everett <nik9000@gmail.com>
tvernum pushed a commit that referenced this pull request Feb 25, 2025
Initial support for Apache Arrow's streaming format as a response for ES|QL. It triggers based on the Accept header or the format request parameter.

Arrow has implementations in every mainstream language and is a backend of the Python Pandas library, which is extremely popular among data scientists and data analysts. Arrow's streaming format has also become the de facto standard for dataframe interchange. It is an efficient binary format that allows zero-cost deserialization by adding data access wrappers on top of memory buffers received from the network.

This PR builds on the experiment made by @nik9000 in PR #104877

Features/limitations:
- all ES|QL data types are supported
- multi-valued fields are not supported
- fields of type _source are output as JSON text in a varchar array. In a future iteration we may want to offer the choice of the more efficient CBOR and SMILE formats.

Technical details:

Arrow comes with its own memory management to handle vectors with direct memory, reference counting, etc. We don't want to use this as it conflicts with Elasticsearch's own memory management.

We therefore use the Arrow library only for the metadata objects describing the dataframe schema and the structure of the streaming format. The Arrow vector data is produced directly from ES|QL blocks.

---------

Co-authored-by: Nik Everett <nik9000@gmail.com>
tvernum pushed a commit that referenced this pull request Feb 25, 2025
Initial support for Apache Arrow's streaming format as a response for ES|QL. It triggers based on the Accept header or the format request parameter.

Arrow has implementations in every mainstream language and is a backend of the Python Pandas library, which is extremely popular among data scientists and data analysts. Arrow's streaming format has also become the de facto standard for dataframe interchange. It is an efficient binary format that allows zero-cost deserialization by adding data access wrappers on top of memory buffers received from the network.

This PR builds on the experiment made by @nik9000 in PR #104877

Features/limitations:
- all ES|QL data types are supported
- multi-valued fields are not supported
- fields of type _source are output as JSON text in a varchar array. In a future iteration we may want to offer the choice of the more efficient CBOR and SMILE formats.

Technical details:

Arrow comes with its own memory management to handle vectors with direct memory, reference counting, etc. We don't want to use this as it conflicts with Elasticsearch's own memory management.

We therefore use the Arrow library only for the metadata objects describing the dataframe schema and the structure of the streaming format. The Arrow vector data is produced directly from ES|QL blocks.

---------

Co-authored-by: Nik Everett <nik9000@gmail.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants