-
Notifications
You must be signed in to change notification settings - Fork 1.9k
Labels
bugSomething isn't workingSomething isn't working
Description
Describe the bug
I'd expect as I add fields to structs, I should be able to cast one into another. You can see in the repro below this doesn't seem to be allowed:
To Reproduce
use std::fs;
use std::sync::Arc;
use datafusion::prelude::*;
use datafusion::arrow::datatypes::{DataType, Field, Schema, TimeUnit};
use datafusion::arrow::record_batch::RecordBatch;
use datafusion::arrow::array::{Array, StringArray, StructArray, TimestampMillisecondArray, Float64Array};
use datafusion::datasource::listing::{ListingOptions, ListingTable, ListingTableConfig, ListingTableUrl};
use datafusion::datasource::file_format::parquet::ParquetFormat;
use datafusion::dataframe::DataFrameWriteOptions;
#[tokio::test]
async fn test_datafusion_schema_evolution_with_compaction() -> Result<(), Box<dyn std::error::Error>> {
let ctx = SessionContext::new();
let schema1 = Arc::new(Schema::new(vec![
Field::new("component", DataType::Utf8, true),
Field::new("message", DataType::Utf8, true),
Field::new("stack", DataType::Utf8, true),
Field::new("timestamp", DataType::Utf8, true),
Field::new(
"timestamp_utc",
DataType::Timestamp(TimeUnit::Millisecond, None),
true,
),
Field::new(
"additionalInfo",
DataType::Struct(vec![
Field::new("location", DataType::Utf8, true),
Field::new(
"timestamp_utc",
DataType::Timestamp(TimeUnit::Millisecond, None),
true,
),
].into()),
true,
),
]));
let batch1 = RecordBatch::try_new(
schema1.clone(),
vec![
Arc::new(StringArray::from(vec![Some("component1")])),
Arc::new(StringArray::from(vec![Some("message1")])),
Arc::new(StringArray::from(vec![Some("stack_trace")])),
Arc::new(StringArray::from(vec![Some("2025-02-18T00:00:00Z")])),
Arc::new(TimestampMillisecondArray::from(vec![Some(1640995200000)])),
Arc::new(StructArray::from(vec![
(
Arc::new(Field::new("location", DataType::Utf8, true)),
Arc::new(StringArray::from(vec![Some("USA")])) as Arc<dyn Array>,
),
(
Arc::new(Field::new(
"timestamp_utc",
DataType::Timestamp(TimeUnit::Millisecond, None),
true,
)),
Arc::new(TimestampMillisecondArray::from(vec![Some(1640995200000)])),
),
])),
],
)?;
let path1 = "test_data1.parquet";
let _ = fs::remove_file(path1);
let df1 = ctx.read_batch(batch1)?;
df1.write_parquet(
path1,
DataFrameWriteOptions::default()
.with_single_file_output(true)
.with_sort_by(vec![col("timestamp_utc").sort(true, true)]),
None
).await?;
let schema2 = Arc::new(Schema::new(vec![
Field::new("component", DataType::Utf8, true),
Field::new("message", DataType::Utf8, true),
Field::new("stack", DataType::Utf8, true),
Field::new("timestamp", DataType::Utf8, true),
Field::new(
"timestamp_utc",
DataType::Timestamp(TimeUnit::Millisecond, None),
true,
),
Field::new(
"additionalInfo",
DataType::Struct(vec![
Field::new("location", DataType::Utf8, true),
Field::new(
"timestamp_utc",
DataType::Timestamp(TimeUnit::Millisecond, None),
true,
),
Field::new(
"reason",
DataType::Struct(vec![
Field::new("_level", DataType::Float64, true),
Field::new(
"details",
DataType::Struct(vec![
Field::new("rurl", DataType::Utf8, true),
Field::new("s", DataType::Float64, true),
Field::new("t", DataType::Utf8, true),
].into()),
true,
),
].into()),
true,
),
].into()),
true,
),
]));
let batch2 = RecordBatch::try_new(
schema2.clone(),
vec![
Arc::new(StringArray::from(vec![Some("component1")])),
Arc::new(StringArray::from(vec![Some("message1")])),
Arc::new(StringArray::from(vec![Some("stack_trace")])),
Arc::new(StringArray::from(vec![Some("2025-02-18T00:00:00Z")])),
Arc::new(TimestampMillisecondArray::from(vec![Some(1640995200000)])),
Arc::new(StructArray::from(vec![
(
Arc::new(Field::new("location", DataType::Utf8, true)),
Arc::new(StringArray::from(vec![Some("USA")])) as Arc<dyn Array>,
),
(
Arc::new(Field::new(
"timestamp_utc",
DataType::Timestamp(TimeUnit::Millisecond, None),
true,
)),
Arc::new(TimestampMillisecondArray::from(vec![Some(1640995200000)])),
),
(
Arc::new(Field::new(
"reason",
DataType::Struct(vec![
Field::new("_level", DataType::Float64, true),
Field::new(
"details",
DataType::Struct(vec![
Field::new("rurl", DataType::Utf8, true),
Field::new("s", DataType::Float64, true),
Field::new("t", DataType::Utf8, true),
].into()),
true,
),
].into()),
true,
)),
Arc::new(StructArray::from(vec![
(
Arc::new(Field::new("_level", DataType::Float64, true)),
Arc::new(Float64Array::from(vec![Some(1.5)])) as Arc<dyn Array>,
),
(
Arc::new(Field::new(
"details",
DataType::Struct(vec![
Field::new("rurl", DataType::Utf8, true),
Field::new("s", DataType::Float64, true),
Field::new("t", DataType::Utf8, true),
].into()),
true,
)),
Arc::new(StructArray::from(vec![
(
Arc::new(Field::new("rurl", DataType::Utf8, true)),
Arc::new(StringArray::from(vec![Some("https://example.com")])) as Arc<dyn Array>,
),
(
Arc::new(Field::new("s", DataType::Float64, true)),
Arc::new(Float64Array::from(vec![Some(3.14)])) as Arc<dyn Array>,
),
(
Arc::new(Field::new("t", DataType::Utf8, true)),
Arc::new(StringArray::from(vec![Some("data")])) as Arc<dyn Array>,
),
])),
),
])),
),
])),
],
)?;
let path2 = "test_data2.parquet";
let _ = fs::remove_file(path2);
let df2 = ctx.read_batch(batch2)?;
df2.write_parquet(
path2,
DataFrameWriteOptions::default()
.with_single_file_output(true)
.with_sort_by(vec![col("timestamp_utc").sort(true, true)]),
None
).await?;
let paths_str = vec![path1.to_string(), path2.to_string()];
let config = ListingTableConfig::new_with_multi_paths(
paths_str
.into_iter()
.map(|p| ListingTableUrl::parse(&p))
.collect::<Result<Vec<_>, _>>()?
)
.with_schema(schema2.as_ref().clone().into())
.infer(&ctx.state()).await?;
let config = ListingTableConfig {
options: Some(ListingOptions {
file_sort_order: vec![vec![
col("timestamp_utc").sort(true, true),
]],
..config.options.unwrap_or_else(|| ListingOptions::new(Arc::new(ParquetFormat::default())))
}),
..config
};
let listing_table = ListingTable::try_new(config)?;
ctx.register_table("events", Arc::new(listing_table))?;
let df = ctx.sql("SELECT * FROM events ORDER BY timestamp_utc").await?;
let results = df.clone().collect().await?;
assert_eq!(results[0].num_rows(), 2);
let compacted_path = "test_data_compacted.parquet";
let _ = fs::remove_file(compacted_path);
df.write_parquet(
compacted_path,
DataFrameWriteOptions::default()
.with_single_file_output(true)
.with_sort_by(vec![col("timestamp_utc").sort(true, true)]),
None
).await?;
let new_ctx = SessionContext::new();
let config = ListingTableConfig::new_with_multi_paths(vec![ListingTableUrl::parse(compacted_path)?])
.with_schema(schema2.as_ref().clone().into())
.infer(&new_ctx.state()).await?;
let listing_table = ListingTable::try_new(config)?;
new_ctx.register_table("events", Arc::new(listing_table))?;
let df = new_ctx.sql("SELECT * FROM events ORDER BY timestamp_utc").await?;
let compacted_results = df.collect().await?;
assert_eq!(compacted_results[0].num_rows(), 2);
assert_eq!(results, compacted_results);
let _ = fs::remove_file(path1);
let _ = fs::remove_file(path2);
let _ = fs::remove_file(compacted_path);
Ok(())
}
produces
Error: Plan("Cannot cast file schema field additionalInfo of type Struct([Field { name: \"location\", data_type: Utf8, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: \"timestamp_utc\", data_type: Timestamp(Millisecond, None), nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: \"reason\", data_type: Struct([Field { name: \"_level\", data_type: Float64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: \"details\", data_type: Struct([Field { name: \"rurl\", data_type: Utf8, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: \"s\", data_type: Float64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: \"t\", data_type: Utf8, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }]), nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }]), nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }]) to table schema field of type Struct([Field { name: \"location\", data_type: Utf8, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: \"timestamp_utc\", data_type: Timestamp(Millisecond, None), nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }])
Expected behavior
i expected that test to pass
Additional context
No response
Metadata
Metadata
Assignees
Labels
bugSomething isn't workingSomething isn't working