Skip to content

Commit 1d9bab3

Browse files
committed
Fix bugs
1 parent 1083188 commit 1d9bab3

11 files changed

Lines changed: 76 additions & 18 deletions

File tree

src/Storages/ObjectStorage/DataLakes/Iceberg/IcebergMetadata.cpp

Lines changed: 46 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -283,7 +283,7 @@ std::optional<Int32> IcebergMetadata::getSchemaVersionByFileIfOutdated(String da
283283
auto manifest_file_it = manifest_file_by_data_file.find(data_path);
284284
if (manifest_file_it == manifest_file_by_data_file.end())
285285
{
286-
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Cannot find schema version for data file: {}", data_path);
286+
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Cannot find manifest file for data file: {}", data_path);
287287
}
288288
const ManifestFileContent & manifest_file = *manifest_file_it->second;
289289
auto schema_id = manifest_file.getSchemaId();
@@ -335,6 +335,30 @@ ManifestList IcebergMetadata::initializeManifestList(const String & filename) co
335335
auto manifest_list_file_reader
336336
= std::make_unique<avro::DataFileReaderBase>(std::make_unique<AvroInputStreamReadBufferAdapter>(*manifest_list_buf));
337337

338+
339+
LOG_DEBUG(&Poco::Logger::get("IcebergMetadata, ManifestList"), "dataSchema: {}", manifest_list_file_reader->dataSchema().toJson(true));
340+
341+
std::stringstream data_schema_root_ss;
342+
manifest_list_file_reader->dataSchema().root()->printJson(data_schema_root_ss, 10);
343+
LOG_DEBUG(&Poco::Logger::get("IcebergMetadata, ManifestList"), "dataSchema root: {}", data_schema_root_ss.str());
344+
345+
LOG_DEBUG(
346+
&Poco::Logger::get("IcebergMetadata, ManifestList"),
347+
"dataSchema root type: {}",
348+
manifest_list_file_reader->dataSchema().root()->type());
349+
350+
for (size_t i = 0; i < manifest_list_file_reader->dataSchema().root()->leaves(); ++i)
351+
{
352+
const auto & field = manifest_list_file_reader->dataSchema().root()->leafAt(static_cast<int>(i));
353+
354+
const auto & field_name = manifest_list_file_reader->dataSchema().root()->nameAt(static_cast<int>(i));
355+
356+
std::stringstream ss;
357+
field->printJson(ss, 10);
358+
LOG_DEBUG(&Poco::Logger::get("IcebergMetadata, ManifestList"), "field: {}", ss.str());
359+
LOG_DEBUG(&Poco::Logger::get("IcebergMetadata, ManifestList"), "field name: {}", field_name);
360+
}
361+
338362
auto [name_to_index, name_to_data_type, header] = getColumnsAndTypesFromAvroByNames(
339363
manifest_list_file_reader->dataSchema().root(),
340364
{"manifest_path", "sequence_number"},
@@ -384,9 +408,16 @@ ManifestList IcebergMetadata::initializeManifestList(const String & filename) co
384408
{
385409
added_sequence_number = sequence_number_column.value()->getInt(i);
386410
}
387-
auto [manifest_file_iterator, _inserted]
388-
= manifest_files_by_name.emplace(current_filename, initializeManifestFile(current_filename, added_sequence_number));
389-
manifest_list.push_back(ManifestListFileEntry{ManifestFileIterator{manifest_file_iterator}, added_sequence_number});
411+
/// We can't encapsulate this logic in getManifestFile because we need not only the name of the file, but also an inherited sequence number which is known only during the parsing of ManifestList
412+
auto manifest_file_content = initializeManifestFile(current_filename, added_sequence_number);
413+
auto [iterator, _inserted] = manifest_files_by_name.emplace(current_filename, std::move(manifest_file_content));
414+
auto manifest_file_iterator = ManifestFileIterator{iterator};
415+
for (const auto & data_file_path : manifest_file_iterator->getFiles())
416+
{
417+
if (std::holds_alternative<DataFileEntry>(data_file_path.file))
418+
manifest_file_by_data_file.emplace(std::get<DataFileEntry>(data_file_path.file).file_name, manifest_file_iterator);
419+
}
420+
manifest_list.push_back(ManifestListFileEntry{manifest_file_iterator, added_sequence_number});
390421
}
391422

392423
return manifest_list;
@@ -421,6 +452,14 @@ ManifestFileIterator IcebergMetadata::getManifestFile(const String & filename) c
421452
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Cannot find manifest file: {}", filename);
422453
}
423454

455+
std::optional<ManifestFileIterator> IcebergMetadata::tryGetManifestFile(const String & filename) const
456+
{
457+
auto manifest_file_it = manifest_files_by_name.find(filename);
458+
if (manifest_file_it != manifest_files_by_name.end())
459+
return ManifestFileIterator{manifest_file_it};
460+
return std::nullopt;
461+
}
462+
424463
ManifestListIterator IcebergMetadata::getManifestList(const String & filename) const
425464
{
426465
auto manifest_file_it = manifest_lists_by_name.find(filename);
@@ -464,10 +503,10 @@ Strings IcebergMetadata::getDataFilesImpl(const ActionsDAG * filter_dag) const
464503
return cached_unprunned_files_for_current_snapshot.value();
465504

466505
Strings data_files;
467-
for (const auto & manifest_entry : *current_snapshot->manifest_list_iterator)
506+
for (const auto & manifest_list_entry : *(current_snapshot->manifest_list_iterator))
468507
{
469508
const auto & partition_columns_ids
470-
= getRelevantPartitionColumnIds(manifest_entry.manifest_file, schema_processor, current_schema_id);
509+
= getRelevantPartitionColumnIds(manifest_list_entry.manifest_file, schema_processor, current_schema_id);
471510
const auto & partition_pruning_columns_names_and_types
472511
= schema_processor.tryGetFieldsCharacteristics(current_schema_id, partition_columns_ids);
473512

@@ -476,7 +515,7 @@ Strings IcebergMetadata::getDataFilesImpl(const ActionsDAG * filter_dag) const
476515
const KeyCondition partition_key_condition(
477516
filter_dag, getContext(), partition_pruning_columns_names_and_types.getNames(), partition_minmax_idx_expr);
478517

479-
const auto & data_files_in_manifest = manifest_entry.manifest_file->getFiles();
518+
const auto & data_files_in_manifest = manifest_list_entry.manifest_file->getFiles();
480519
for (const auto & manifest_file_entry : data_files_in_manifest)
481520
{
482521
if (manifest_file_entry.status != ManifestEntryStatus::DELETED)

src/Storages/ObjectStorage/DataLakes/Iceberg/IcebergMetadata.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -130,6 +130,8 @@ class IcebergMetadata : public IDataLakeMetadata, private WithContext
130130

131131
Strings getDataFilesImpl(const ActionsDAG * filter_dag) const;
132132

133+
std::optional<Iceberg::ManifestFileIterator> tryGetManifestFile(const String & filename) const;
134+
133135
//Fields are needed only for providing dynamic polymorphism
134136
std::unordered_map<String, String> column_name_to_physical_name;
135137
DataLakePartitionColumns partition_columns;

src/Storages/ObjectStorage/DataLakes/Iceberg/ManifestFile.cpp

Lines changed: 21 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
#include <Poco/JSON/Parser.h>
1313
#include "DataTypes/DataTypeTuple.h"
1414

15+
# include <Common/logger_useful.h>
1516

1617
namespace DB::ErrorCodes
1718
{
@@ -48,6 +49,7 @@ std::vector<DB::Range> ManifestFileEntry::getPartitionRanges(const std::vector<I
4849

4950
const std::vector<PartitionColumnInfo> & ManifestFileContent::getPartitionColumnInfos() const
5051
{
52+
chassert(impl != nullptr);
5153
return impl->partition_column_infos;
5254
}
5355

@@ -69,10 +71,21 @@ ManifestFileContentImpl::ManifestFileContentImpl(
6971
Int64 inherited_sequence_number)
7072
{
7173
this->schema_id = schema_id_;
74+
75+
LOG_DEBUG(&Poco::Logger::get("ManifestFileContentImpl"), "dataSchema: {}", manifest_file_reader_->dataSchema().toJson(true));
76+
77+
LOG_DEBUG(&Poco::Logger::get("ManifestFileContentImpl"), "readerSchema: {}", manifest_file_reader_->readerSchema().toJson(true));
78+
79+
7280
avro::NodePtr root_node = manifest_file_reader_->dataSchema().root();
7381

7482
auto [name_to_index, name_to_data_type, manifest_file_header] = getColumnsAndTypesFromAvroByNames(
75-
root_node, {"status", "data_file", "sequence_number"}, {avro::Type::AVRO_INT, avro::Type::AVRO_RECORD, avro::Type::AVRO_LONG});
83+
root_node, {"status", "data_file", "sequence_number"}, {avro::Type::AVRO_INT, avro::Type::AVRO_RECORD, avro::Type::AVRO_UNION});
84+
85+
for (const auto & [key, value] : name_to_index)
86+
{
87+
LOG_DEBUG(&Poco::Logger::get("ManifestFileContentImpl name_to_index"), "key: {} value: {}", key, value);
88+
}
7689

7790
if (name_to_index.find("status") == name_to_index.end())
7891
throw Exception(DB::ErrorCodes::ICEBERG_SPECIFICATION_VIOLATION, "Required columns are not found in manifest file: status");
@@ -178,16 +191,16 @@ ManifestFileContentImpl::ManifestFileContentImpl(
178191
partition_columns.push_back(removeNullable(big_partition_tuple->getColumnPtr(i)));
179192
}
180193

181-
if (columns.at(name_to_index.at("sequence_number"))->getDataType() != TypeIndex::Nullable)
182-
{
183-
throw Exception(
184-
DB::ErrorCodes::ILLEGAL_COLUMN,
185-
"The parsed column from Avro file of `sequence_number` field should be Int64 type, got {}",
186-
columns.at(name_to_index.at("sequence_number"))->getFamilyName());
187-
}
188194
std::optional<const ColumnNullable *> sequence_number_column = std::nullopt;
189195
if (format_version_ > 1)
190196
{
197+
if (columns.at(name_to_index.at("sequence_number"))->getDataType() != TypeIndex::Nullable)
198+
{
199+
throw Exception(
200+
DB::ErrorCodes::ILLEGAL_COLUMN,
201+
"The parsed column from Avro file of `sequence_number` field should be Int64 type, got {}",
202+
columns.at(name_to_index.at("sequence_number"))->getFamilyName());
203+
}
191204
sequence_number_column = assert_cast<const ColumnNullable *>(columns.at(name_to_index.at("sequence_number")).get());
192205
}
193206

src/Storages/ObjectStorage/DataLakes/Iceberg/Utils.cpp

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,12 @@
11

2+
#include <typeinfo>
23
#include "config.h"
34

45
#if USE_AVRO
56

67
#include <Processors/Formats/Impl/AvroRowInputFormat.h>
78
#include <Storages/ObjectStorage/DataLakes/Iceberg/Utils.h>
9+
# include <Common/logger_useful.h>
810

911
namespace DB::ErrorCodes
1012
{
@@ -55,22 +57,24 @@ std::tuple<NameToIndex, NameToDataType, DB::Block> getColumnsAndTypesFromAvroByN
5557
size_t leaves_num = root_node->leaves();
5658
for (size_t i = 0; i < leaves_num; ++i)
5759
{
58-
const auto & name = root_node->leafAt(static_cast<int>(i))->name();
60+
const auto & name = root_node->nameAt(static_cast<int>(i));
61+
5962
if (initial_index_by_name.find(name) != initial_index_by_name.end())
6063
initial_index_by_name[name] = i;
6164
}
6265

6366

6467
size_t current_new_index = 0;
6568
ColumnsWithTypeAndName columns_to_add = {};
66-
for (const auto & name : names)
69+
for (size_t i = 0; i < names.size(); ++i)
6770
{
71+
const auto & name = names[i];
6872
if (initial_index_by_name.at(name).has_value())
6973
{
7074
name_to_index.insert({name, current_new_index++});
7175
const auto node = root_node->leafAt(static_cast<int>(initial_index_by_name.at(name).value()));
7276
const size_t initial_index = initial_index_by_name.at(name).value();
73-
if (node->type() != expected_types.at(initial_index))
77+
if (node->type() != expected_types.at(i))
7478
{
7579
throw Exception(
7680
ErrorCodes::BAD_TYPE_OF_FIELD,

tests/integration/test_storage_iceberg/__init__.py

100644100755
File mode changed.

tests/integration/test_storage_iceberg/configs/config.d/cluster.xml

100644100755
File mode changed.

tests/integration/test_storage_iceberg/configs/config.d/filesystem_caches.xml

100644100755
File mode changed.

tests/integration/test_storage_iceberg/configs/config.d/named_collections.xml

100644100755
File mode changed.

tests/integration/test_storage_iceberg/configs/config.d/query_log.xml

100644100755
File mode changed.

tests/integration/test_storage_iceberg/configs/users.d/users.xml

100644100755
File mode changed.

0 commit comments

Comments
 (0)