-
Notifications
You must be signed in to change notification settings - Fork 1.1k
Description
Describe the bug
Let me introduce the Schema of the data in an easily readable format (the Apache Spark pretty print format):
root
|-- id: string (nullable = true)
|-- prices: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- currency: string (nullable = true)
| | |-- value: double (nullable = true)
| | |-- meta: array (nullable = true)
| | | |-- element: struct (containsNull = true)
| | | | |-- loc: string (nullable = true)
|-- bids: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- currency: string (nullable = true)
| | |-- value: double (nullable = true)
| | |-- meta: array (nullable = true)
| | | |-- element: struct (containsNull = true)
| | | | |-- loc: string (nullable = true)
and some sample data:
+---+----------------------+----+
|id |prices |bids|
+---+----------------------+----+
|t1 |[{GBP, 3.14, [{LON}]}]|null|
|t2 |[{USD, 4.14, [{NYC}]}]|null|
+---+----------------------+----+
As we can see, what we have here are three columns, a UTF-8 column called id and two columns called prices and bid that have the schema, i.e. list<struct<list>>.
I have deliberately left the bids column empty to show the bug.
The bug is that when when we read Parquet with the above schema, with the bids column null for all rows, from Rust code into record batches and then write those record batches to Parquet; the Parquet write fails with:
Error: Parquet error: Incorrect number of rows, expected 2 != 0 rows
This is happening at https://github.com/apache/arrow-rs/blob/master/parquet/src/file/writer.rs#L324 and is due to the fact that the bids column is null.
To Reproduce
Let's create the sample data using the schema as depicted above using the following Python code:
import numpy as np
import pandas as pd
import pyarrow as pa
import pyarrow.parquet as pq
d1 = {
"id": pd.Series(['t1', 't2']),
"prices": pd.Series([
[
{
"currency": "GBP",
"value": 3.14,
'meta': [
{'loc': 'LON'}
]
}
],
[
{
"currency": "USD",
"value": 4.14,
'meta': [
{'loc': 'NYC'}
]
}
]
]),
"bids": pd.Series([], dtype='object')
}
df = pd.DataFrame(d1)
list_type = pa.list_(
pa.struct([
('currency', pa.string()),
('value', pa.float64()),
('meta', pa.list_(
pa.struct([
('loc', pa.string())
])
))
]))
schema = pa.schema([
('id', pa.string()),
('prices', list_type),
('bids', list_type)
])
table = pa.Table.from_pandas(df, schema=schema)
filename = '/tmp/demo_one_arrow.parquet'
pq.write_table(table, filename)
expected_table = pq.read_table(filename).to_pandas()
print(expected_table.to_string())
When we run this code, we can see that a valid Parquet file is indeed produced. The Parquet that is created is read back and we see the following:
id prices bids
0 t1 [{'currency': 'GBP', 'value': 3.14, 'meta': [{'loc': 'LON'}]}] None
1 t2 [{'currency': 'USD', 'value': 4.14, 'meta': [{'loc': 'NYC'}]}] None
Let's now try to read the same Parquet from Rust and write it back to another Parquet file:
use std::{fs::File, path::Path, sync::Arc};
use arrow::record_batch::RecordBatch;
use parquet::{
arrow::{ArrowReader, ArrowWriter, ParquetFileArrowReader},
file::serialized_reader::SerializedFileReader,
};
use std::error::Error;
type Result<T> = std::result::Result<T, Box<dyn Error>>;
pub fn main() -> Result<()> {
let filename = "/tmp/demo_one_arrow.parquet";
// Read Parquet from file
let record_batches = read_parquet(filename)?;
println!("Writing Parquet...");
// write what we just read
write_parquet("/tmp/demo_one_arrow2.parquet", record_batches)?;
println!("Reading back...");
// Read back what we just wrote
let expected_batches = read_parquet("/tmp/demo_one_arrow2.parquet")?;
let _expected_columns = expected_batches
.iter()
.map(|rb| rb.columns())
.collect::<Vec<_>>();
Ok(())
}
fn read_parquet(filename: &str) -> Result<Vec<RecordBatch>> {
let path = Path::new(filename);
let file = File::open(path)?;
println!("Reading {}", filename);
let reader = SerializedFileReader::new(file)?;
let mut arrow_reader = ParquetFileArrowReader::new(Arc::new(reader));
let rb_reader = arrow_reader.get_record_reader(1024)?;
let mut record_batches = vec![];
for rb_result in rb_reader {
let rb = rb_result?;
record_batches.push(rb);
}
Ok(record_batches)
}
fn write_parquet(filename: &str, record_batches: Vec<RecordBatch>) -> Result<()> {
let file = File::create(filename)?;
let schema = record_batches[0].schema();
let mut writer = ArrowWriter::try_new(file, schema, None)?;
for batch in record_batches {
writer.write(&batch)?;
}
writer.close()?;
Ok(())
}
This reads the Parquet file fine, but fails when writing out the record batches as Parquet with the following error:
Error: Parquet error: Incorrect number of rows, expected 2 != 0 rows
I can see that this is due to the fact the bids column is null.
Expected behavior
We should expect the record batches to be written correctly to Parquet even if a column is null for all rows.
Additional context
The issue arises due to the presence of the second level of nesting, i.e. the following
('meta', pa.list_(
pa.struct([
('loc', pa.string())
])
))
If we remove this second level of nesting, then the null bids column does get written. However, we expect this to work even in the presence of the second, third etc level of nesting, which works with pyarrow as well.