Skip to content

Commit f70f7a9

Browse files
committed
Fix min/max value in Iceberg writes
1 parent df3de28 commit f70f7a9

2 files changed

Lines changed: 21 additions & 5 deletions

File tree

src/Storages/ObjectStorage/DataLakes/Iceberg/IcebergWrites.cpp

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1232,11 +1232,6 @@ void IcebergStorageSink::consume(Chunk & chunk)
12321232
{
12331233
auto [data_filename, data_filename_in_storage] = filename_generator.generateDataFileName();
12341234
data_filenames[partition_key] = data_filename;
1235-
if (!statistics.contains(partition_key))
1236-
{
1237-
statistics.emplace(partition_key, current_schema->getArray(Iceberg::f_fields));
1238-
}
1239-
statistics.at(partition_key).update(part_chunk);
12401235

12411236
auto buffer = object_storage->writeObject(
12421237
StoredObject(data_filename_in_storage), WriteMode::Rewrite, std::nullopt, DBMS_DEFAULT_BUFFER_SIZE, context->getWriteSettings());
@@ -1252,6 +1247,12 @@ void IcebergStorageSink::consume(Chunk & chunk)
12521247
configuration->format, *write_buffers[partition_key], *sample_block, context, format_settings);
12531248
}
12541249

1250+
if (!statistics.contains(partition_key))
1251+
{
1252+
statistics.emplace(partition_key, current_schema->getArray(Iceberg::f_fields));
1253+
}
1254+
statistics.at(partition_key).update(part_chunk);
1255+
12551256
writers[partition_key]->write(getHeader().cloneWithColumns(part_chunk.getColumns()));
12561257
}
12571258
}

tests/integration/test_storage_iceberg/test.py

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3269,3 +3269,18 @@ def check_validity_and_get_prunned_files(select_expression):
32693269
)
32703270
== 3
32713271
)
3272+
3273+
3274+
def test_iceberg_write_minmax(started_cluster):
3275+
instance = started_cluster.instances["node1"]
3276+
TABLE_NAME = "test_iceberg_write_minmax_" + get_uuid_str()
3277+
3278+
create_iceberg_table("local", instance, TABLE_NAME, started_cluster, "(x Int32, y Int32)", partition_by="identity(x)")
3279+
3280+
instance.query(f"INSERT INTO {TABLE_NAME} VALUES (1, 1), (1, 2)", settings={"allow_experimental_insert_into_iceberg": 1})
3281+
3282+
res = instance.query(f"SELECT x,y FROM {TABLE_NAME} WHERE y=1 ORDER BY ALL").strip()
3283+
assert res == "1\t1"
3284+
3285+
res = instance.query(f"SELECT x,y FROM {TABLE_NAME} WHERE y=2 ORDER BY ALL").strip()
3286+
assert res == "1\t2"

0 commit comments

Comments
 (0)