Skip to content

Parquet write failure (from record batches) when data is nested two levels deep  #1744

@ahmedriza

Description

@ahmedriza

Describe the bug
Let me introduce the Schema of the data in an easily readable format (the Apache Spark pretty print format):

root
 |-- id: string (nullable = true)
 |-- prices: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- currency: string (nullable = true)
 |    |    |-- value: double (nullable = true)
 |    |    |-- meta: array (nullable = true)
 |    |    |    |-- element: struct (containsNull = true)
 |    |    |    |    |-- loc: string (nullable = true)
 |-- bids: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- currency: string (nullable = true)
 |    |    |-- value: double (nullable = true)
 |    |    |-- meta: array (nullable = true)
 |    |    |    |-- element: struct (containsNull = true)
 |    |    |    |    |-- loc: string (nullable = true)

and some sample data:

+---+----------------------+----+
|id |prices                |bids|
+---+----------------------+----+
|t1 |[{GBP, 3.14, [{LON}]}]|null|
|t2 |[{USD, 4.14, [{NYC}]}]|null|
+---+----------------------+----+

As we can see, what we have here are three columns, a UTF-8 column called id and two columns called prices and bid that have the schema, i.e. list<struct<list>>.

I have deliberately left the bids column empty to show the bug.

The bug is that when when we read Parquet with the above schema, with the bids column null for all rows, from Rust code into record batches and then write those record batches to Parquet; the Parquet write fails with:

Error: Parquet error: Incorrect number of rows, expected 2 != 0 rows

This is happening at https://github.com/apache/arrow-rs/blob/master/parquet/src/file/writer.rs#L324 and is due to the fact that the bids column is null.

To Reproduce

Let's create the sample data using the schema as depicted above using the following Python code:

import numpy as np
import pandas as pd
import pyarrow as pa
import pyarrow.parquet as pq

d1 = {
    "id": pd.Series(['t1', 't2']),
    "prices": pd.Series([
        [
            {
                "currency": "GBP",
                "value": 3.14,
                'meta': [
                    {'loc': 'LON'}
                ]
            }
        ],
        [
            {
                "currency": "USD",
                "value": 4.14,
                'meta': [
                    {'loc': 'NYC'}
                ]                
            }
        ]
    ]),
    "bids": pd.Series([], dtype='object')
}

df = pd.DataFrame(d1)

list_type = pa.list_(
    pa.struct([
        ('currency', pa.string()),
        ('value', pa.float64()),
        ('meta', pa.list_(
            pa.struct([
                ('loc', pa.string())
            ])
        ))
    ]))

schema = pa.schema([
    ('id', pa.string()),
    ('prices', list_type),
    ('bids', list_type)
])

table = pa.Table.from_pandas(df, schema=schema)
filename = '/tmp/demo_one_arrow.parquet'
pq.write_table(table, filename)

expected_table = pq.read_table(filename).to_pandas()
print(expected_table.to_string())

When we run this code, we can see that a valid Parquet file is indeed produced. The Parquet that is created is read back and we see the following:

   id                                                          prices  bids
0  t1  [{'currency': 'GBP', 'value': 3.14, 'meta': [{'loc': 'LON'}]}]  None
1  t2  [{'currency': 'USD', 'value': 4.14, 'meta': [{'loc': 'NYC'}]}]  None

Let's now try to read the same Parquet from Rust and write it back to another Parquet file:

use std::{fs::File, path::Path, sync::Arc};

use arrow::record_batch::RecordBatch;
use parquet::{
    arrow::{ArrowReader, ArrowWriter, ParquetFileArrowReader},
    file::serialized_reader::SerializedFileReader,
};
use std::error::Error;
type Result<T> = std::result::Result<T, Box<dyn Error>>;

pub fn main() -> Result<()> {
    let filename = "/tmp/demo_one_arrow.parquet";

    // Read Parquet from file
    let record_batches = read_parquet(filename)?;

    println!("Writing Parquet...");
    // write what we just read
    write_parquet("/tmp/demo_one_arrow2.parquet", record_batches)?;

    println!("Reading back...");
    // Read back what we just wrote
    let expected_batches = read_parquet("/tmp/demo_one_arrow2.parquet")?;
    let _expected_columns = expected_batches
        .iter()
        .map(|rb| rb.columns())
        .collect::<Vec<_>>();
    
    Ok(())
}

fn read_parquet(filename: &str) -> Result<Vec<RecordBatch>> {
    let path = Path::new(filename);
    let file = File::open(path)?;
    println!("Reading {}", filename);
    let reader = SerializedFileReader::new(file)?;
    let mut arrow_reader = ParquetFileArrowReader::new(Arc::new(reader));
    let rb_reader = arrow_reader.get_record_reader(1024)?;
    let mut record_batches = vec![];
    for rb_result in rb_reader {
        let rb = rb_result?;
        record_batches.push(rb);
    }
    Ok(record_batches)
}

fn write_parquet(filename: &str, record_batches: Vec<RecordBatch>) -> Result<()> {
    let file = File::create(filename)?;
    let schema = record_batches[0].schema();
    let mut writer = ArrowWriter::try_new(file, schema, None)?;
    for batch in record_batches {
        writer.write(&batch)?;
    }
    writer.close()?;
    Ok(())
}

This reads the Parquet file fine, but fails when writing out the record batches as Parquet with the following error:

Error: Parquet error: Incorrect number of rows, expected 2 != 0 rows

I can see that this is due to the fact the bids column is null.

Expected behavior

We should expect the record batches to be written correctly to Parquet even if a column is null for all rows.

Additional context
The issue arises due to the presence of the second level of nesting, i.e. the following

('meta', pa.list_(
            pa.struct([
                ('loc', pa.string())
            ])
        ))

If we remove this second level of nesting, then the null bids column does get written. However, we expect this to work even in the presence of the second, third etc level of nesting, which works with pyarrow as well.

Metadata

Metadata

Assignees

No one assigned

    Labels

    bugparquetChanges to the parquet crate

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions