-
Notifications
You must be signed in to change notification settings - Fork 1.9k
Description
Describe the bug
When a column has data type in Dictionary, the parquet metadata statistics returns Exact(Dictionary(Int32, Utf8(NULL))) for min and max values
To Reproduce
Run the test below in this file:
#[tokio::test]
async fn test_statistics_from_parquet_metadat_dictionary() -> Result<()> {
// Data for column c_dic: ["a", "b", "c", "d"]
let values = StringArray::from_iter_values(["a", "b", "c", "d"]);
let keys = Int32Array::from_iter_values([0, 0, 1, 2]);
let dic_array =
DictionaryArray::<Int32Type>::try_new(keys, Arc::new(values)).unwrap();
let boxed_array: Box<dyn arrow_array::Array> = Box::new(dic_array);
let c_dic: ArrayRef = Arc::from(boxed_array);
// Define the schema
let field = Field::new(
"c_dic",
DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8)),
false,
);
let schema = Schema::new(vec![field]);
// Create the RecordBatch
let batch1 = RecordBatch::try_new(Arc::new(schema), vec![c_dic]).unwrap();
// Use store_parquet to write each batch to its own file
// . batch1 written into first file and includes:
// - column c_dic that has 4 rows with no null. Stats min and max of string column is missing for this test even the column has values
let store = Arc::new(LocalFileSystem::new()) as _;
let (files, _file_names) = store_parquet(vec![batch1], false).await?;
let state = SessionContext::new().state();
let format = ParquetFormat::default();
let schema = format.infer_schema(&state, &store, &files).await.unwrap();
// Fetch statistics for first file
let pq_meta = fetch_parquet_metadata(store.as_ref(), &files[0], None).await?;
let stats = statistics_from_parquet_meta(&pq_meta, schema.clone()).await?;
assert_eq!(stats.num_rows, Precision::Exact(4));
// column c_dic
let c_dic_stats = &stats.column_statistics[0];
let null_dic: ScalarValue = ScalarValue::Dictionary(
Box::new(DataType::Int32),
Box::new(ScalarValue::Utf8(None)),
);
assert_eq!(c_dic_stats.null_count, Precision::Exact(0));
// BUG:
// Expect:
// Exact(Dictionary(Int32, Utf8("a")))
// Got:
// Exact(Dictionary(Int32, Utf8(NULL)))
assert_eq!(c_dic_stats.max_value, Precision::Exact(null_dic.clone()));
// BUG:
// Expect:
// Exact(Dictionary(Int32, Utf8("d")))
// Got:
// Exact(Dictionary(Int32, Utf8(NULL)))
assert_eq!(c_dic_stats.min_value, Precision::Exact(null_dic.clone()));
Ok(())
}Expected behavior
Expect statistics to show the min and max values. For the reproducer given above, I'm expecting to get:
max_value:Exact(Dictionary(Int32, Utf8("a")))min_value:Exact(Dictionary(Int32, Utf8("d")))
Additional context
The underlying statistics extraction code should have no problems extracting statistics from Dictionary columns
The code is
datafusion/datafusion/core/src/datasource/physical_plan/parquet/statistics.rs
Lines 452 to 454 in 7e49ccf
| DataType::Dictionary(_, value_type) => { | |
| [<$stat_type_prefix:lower _ statistics>](value_type, $iterator) | |
| } |
And the tests are here:
datafusion/datafusion/core/tests/parquet/arrow_statistics.rs
Lines 1729 to 1768 in 7e49ccf
| async fn test_dictionary() { | |
| let reader = TestReader { | |
| scenario: Scenario::Dictionary, | |
| row_per_group: 5, | |
| } | |
| .build() | |
| .await; | |
| Test { | |
| reader: &reader, | |
| expected_min: Arc::new(StringArray::from(vec!["abc", "aaa"])), | |
| expected_max: Arc::new(StringArray::from(vec!["def", "fffff"])), | |
| expected_null_counts: UInt64Array::from(vec![1, 0]), | |
| expected_row_counts: Some(UInt64Array::from(vec![5, 2])), | |
| column_name: "string_dict_i8", | |
| check: Check::RowGroup, | |
| } | |
| .run(); | |
| Test { | |
| reader: &reader, | |
| expected_min: Arc::new(StringArray::from(vec!["abc", "aaa"])), | |
| expected_max: Arc::new(StringArray::from(vec!["def", "fffff"])), | |
| expected_null_counts: UInt64Array::from(vec![1, 0]), | |
| expected_row_counts: Some(UInt64Array::from(vec![5, 2])), | |
| column_name: "string_dict_i32", | |
| check: Check::RowGroup, | |
| } | |
| .run(); | |
| Test { | |
| reader: &reader, | |
| expected_min: Arc::new(Int64Array::from(vec![-100, 0])), | |
| expected_max: Arc::new(Int64Array::from(vec![0, 100])), | |
| expected_null_counts: UInt64Array::from(vec![1, 0]), | |
| expected_row_counts: Some(UInt64Array::from(vec![5, 2])), | |
| column_name: "int_dict_i8", | |
| check: Check::RowGroup, | |
| } | |
| .run(); |
I wonder if something about the code that summarizes the statistics across row groups
datafusion/datafusion/core/src/datasource/file_format/parquet.rs
Lines 468 to 495 in 7e49ccf
| fn summarize_min_max_null_counts( | |
| min_accs: &mut [Option<MinAccumulator>], | |
| max_accs: &mut [Option<MaxAccumulator>], | |
| null_counts_array: &mut [Precision<usize>], | |
| arrow_schema_index: usize, | |
| num_rows: usize, | |
| stats_converter: &StatisticsConverter, | |
| row_groups_metadata: &[RowGroupMetaData], | |
| ) -> Result<()> { | |
| let max_values = stats_converter.row_group_maxes(row_groups_metadata)?; | |
| let min_values = stats_converter.row_group_mins(row_groups_metadata)?; | |
| let null_counts = stats_converter.row_group_null_counts(row_groups_metadata)?; | |
| if let Some(max_acc) = &mut max_accs[arrow_schema_index] { | |
| max_acc.update_batch(&[max_values])?; | |
| } | |
| if let Some(min_acc) = &mut min_accs[arrow_schema_index] { | |
| min_acc.update_batch(&[min_values])?; | |
| } | |
| null_counts_array[arrow_schema_index] = Precision::Exact(match sum(&null_counts) { | |
| Some(null_count) => null_count as usize, | |
| None => num_rows, | |
| }); | |
| Ok(()) | |
| } |
doesn't handle dictionaries correctly 🤔