-
Notifications
You must be signed in to change notification settings - Fork 4.1k
[FlightRPC] Cannot use flight data with DataFusion (Rust) #43552
Copy link
Copy link
Open
Description
Describe the bug, including details regarding any error messages, version, and platform.
Fetching data via Apache Arrow Flight (C++, Python involved) and passing them to Apache DataFusion (Rust) does not work:
Memory pointer from external source (e.g, FFI) is not aligned with the specified scalar type.
Before importing buffer through FFI, please make sure the allocation is aligned.
This is likely due to #32276 / apache/arrow-java#186.
Error:
thread '<unnamed>' panicked at /root/.cargo/registry/src/index.crates.io-6f17d22bba15001f/arrow-buffer-52.0.0/src/buffer/scalar.rs:138:17:
Memory pointer from external source (e.g, FFI) is not aligned with the specified scalar type. Before importing buffer through FFI, please make sure the allocation is aligned.
stack backtrace:
0: 0x7f4d25f576ea - <std::sys::backtrace::_print::DisplayBacktrace as core::fmt::Display>::fmt::h584e154fdf2d8641
1: 0x7f4d24a0eb7b - core::fmt::write::h810564c4cb1595da
2: 0x7f4d25f252a2 - std::io::Write::write_fmt::ha00a1de7318f2a48
3: 0x7f4d25f5cb29 - std::sys::backtrace::print::h6238e978e425409a
4: 0x7f4d25f5c316 - std::panicking::default_hook::{{closure}}::h0acffbc0a684bdb8
5: 0x7f4d25f5d6f5 - std::panicking::rust_panic_with_hook::hcdf40f293c76fc9f
6: 0x7f4d25f5cec2 - std::panicking::begin_panic_handler::{{closure}}::hfd12f36809a34009
7: 0x7f4d25f5ce59 - std::sys::backtrace::__rust_end_short_backtrace::h6a9267615b3cf1db
8: 0x7f4d25f5ce44 - rust_begin_unwind
9: 0x7f4d24a0d4f2 - core::panicking::panic_fmt::hcabcb14b752ed0b3
10: 0x7f4d246177b1 - arrow_buffer::buffer::scalar::ScalarBuffer<T>::new::h04fe130fe772026c
11: 0x7f4d254255a0 - arrow_array::array::get_offsets::h216d5a4c8918fc01
12: 0x7f4d2438af33 - arrow_array::array::make_array::h2cb6f33b6d6b2c59
13: 0x7f4d24390613 - <arrow_array::array::struct_array::StructArray as core::convert::From<arrow_data::data::ArrayData>>::from::h07d61a4146018136
14: 0x7f4d24246b46 - <arrow_array::record_batch::RecordBatch as arrow::pyarrow::FromPyArrow>::from_pyarrow_bound::h1af3cfef0e2e6e5a
15: 0x7f4d23f169cb - <core::iter::adapters::GenericShunt<I,R> as core::iter::traits::iterator::Iterator>::next::h46b0b854c3de8071
16: 0x7f4d240b7375 - <alloc::vec::Vec<T> as arrow::pyarrow::FromPyArrow>::from_pyarrow_bound::he22d7662854ef688
17: 0x7f4d23f1864b - <core::iter::adapters::GenericShunt<I,R> as core::iter::traits::iterator::Iterator>::next::hd78ea1387d583d39
18: 0x7f4d240293e0 - pyo3::impl_::extract_argument::extract_argument::hf1fa544aa176067c
19: 0x7f4d2413265f - datafusion_python::context::PySessionContext::__pymethod_create_dataframe__::h0938b7fdddde4e81
20: 0x7f4d24026e61 - pyo3::impl_::trampoline::trampoline::h86dfdb741875b71a
21: 0x7f4d2412a441 - datafusion_python::context::<impl pyo3::impl_::pyclass::PyMethods<datafusion_python::context::PySessionContext> for pyo3::impl_::pyclass::PyClassImplCollector<datafusion_python::context::PySessionContext>>::py_methods::ITEMS::trampoline::hd70e045974ca5aa8
22: 0x560f47b75569 - <unknown>
23: 0x560f47b5cb2b - _PyEval_EvalFrameDefault
24: 0x560f47b746ac - _PyFunction_Vectorcall
25: 0x560f47b5c935 - _PyEval_EvalFrameDefault
26: 0x560f47b59096 - <unknown>
27: 0x560f47c4ef66 - PyEval_EvalCode
28: 0x560f47c79e98 - <unknown>
29: 0x560f47c7379b - <unknown>
30: 0x560f47c79be5 - <unknown>
31: 0x560f47c790c8 - _PyRun_SimpleFileObject
32: 0x560f47c78d13 - _PyRun_AnyFileObject
33: 0x560f47c6b70e - Py_RunMain
34: 0x560f47c41dfd - Py_BytesMain
35: 0x7f4d5a029d90 - __libc_start_call_main
at ./csu/../sysdeps/nptl/libc_start_call_main.h:58:16
36: 0x7f4d5a029e40 - __libc_start_main_impl
at ./csu/../csu/libc-start.c:392:3
37: 0x560f47c41cf5 - _start
38: 0x0 - <unknown>
Traceback (most recent call last):
File "/home/enrico/git/arrow-datafusion-issue/example.py", line 41, in <module>
main(sys.argv[1])
File "/home/enrico/git/arrow-datafusion-issue/example.py", line 33, in main
df = ctx.create_dataframe([[batch for batch in partition] for partition in partitions])
pyo3_runtime.PanicException: Memory pointer from external source (e.g, FFI) is not aligned with the specified scalar type. Before importing buffer through FFI, please make sure the allocation is aligned.
Reproduce as follows:
git clone --depth=1 https://github.com/apache/arrow.git
git clone --depth=1 https://github.com/apache/arrow-testing.git
python -m venv venv
source venv/bin/activate
pip install pyarrow pandas datafusion
python arrow/python/examples/flight/server.py
RUST_BACKTRACE=1 python example.py arrow-testing/data/csv/aggregate_test_100.csvwith example.py:
import sys
import datafusion
import pyarrow
import pyarrow.flight
import pyarrow.csv as csv
def push_data(client, path):
my_table = csv.read_csv(path).select(["c1"])
df = my_table.to_pandas()
writer, _ = client.do_put(pyarrow.flight.FlightDescriptor.for_path("file"), my_table.schema)
writer.write_table(my_table)
writer.close()
def get_data(client):
descriptor = pyarrow.flight.FlightDescriptor.for_path("file")
info = client.get_flight_info(descriptor)
for endpoint in info.endpoints:
for location in endpoint.locations:
get_client = pyarrow.flight.FlightClient(location)
reader = get_client.do_get(endpoint.ticket)
yield reader.to_reader()
def main(path):
client = pyarrow.flight.FlightClient(f"grpc+tcp://localhost:5005")
push_data(client, path)
partitions = get_data(client)
ctx = datafusion.SessionContext()
df = ctx.create_dataframe([[batch for batch in partition] for partition in partitions])
print(df)
if __name__ == '__main__':
if len(sys.argv) != 2:
print("Provide the path to example CSV file")
sys.exit(1)
main(sys.argv[1])The error is thrown in Apache Arrow Rust implementaton: https://github.com/apache/arrow-rs/blob/eddef43d1cb46c1287da187ea1d86b0e1dc35a13/arrow-buffer/src/buffer/scalar.rs#L138
let align = std::mem::align_of::<T>();
let is_aligned = buffer.as_ptr().align_offset(align) == 0;
match buffer.deallocation() {
Deallocation::Standard(_) => assert!(
is_aligned,
"Memory pointer is not aligned with the specified scalar type"
),
Deallocation::Custom(_, _) =>
assert!(is_aligned, "Memory pointer from external source (e.g, FFI) is not aligned with the specified scalar type. Before importing buffer through FFI, please make sure the allocation is aligned."),
}In my environment, depending on the CSV column, e.g. c1 (c2) with type i32 (i64), align is 4 (8) while buffer.as_ptr().align_offset(align) is always 3 (7), where arrow-rs requires this to be 0.
Component(s)
FlightRPC
Reactions are currently unavailable