Skip to content

Commit 14076d5

Browse files
authored
Merge branch 'antalya-25.8' into backports/antalya-25.8/87687
2 parents 0c7dc63 + d6d27ce commit 14076d5

15 files changed

Lines changed: 535 additions & 20 deletions

File tree

src/Processors/Formats/Impl/Parquet/SchemaConverter.cpp

Lines changed: 16 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -127,7 +127,7 @@ NamesAndTypesList SchemaConverter::inferSchema()
127127
return res;
128128
}
129129

130-
std::string_view SchemaConverter::useColumnMapperIfNeeded(const parq::SchemaElement & element) const
130+
std::string_view SchemaConverter::useColumnMapperIfNeeded(const parq::SchemaElement & element, const String & current_path) const
131131
{
132132
if (!column_mapper)
133133
return element.name;
@@ -142,8 +142,19 @@ std::string_view SchemaConverter::useColumnMapperIfNeeded(const parq::SchemaElem
142142
auto it = map.find(element.field_id);
143143
if (it == map.end())
144144
throw Exception(ErrorCodes::ICEBERG_SPECIFICATION_VIOLATION, "Parquet file has column {} with field_id {} that is not in datalake metadata", element.name, element.field_id);
145-
auto split = Nested::splitName(std::string_view(it->second), /*reverse=*/ true);
146-
return split.second.empty() ? split.first : split.second;
145+
146+
/// At top level (empty path), return the full mapped name. For nested
147+
/// elements, strip the parent path prefix to get the child name.
148+
if (current_path.empty())
149+
return it->second;
150+
151+
/// Strip "current_path." prefix to get child name (preserves dots in child names)
152+
std::string_view mapped = it->second;
153+
if (mapped.starts_with(current_path) && mapped.size() > current_path.size()
154+
&& mapped[current_path.size()] == '.')
155+
return mapped.substr(current_path.size() + 1);
156+
157+
return mapped;
147158
}
148159

149160
void SchemaConverter::processSubtree(TraversalNode & node)
@@ -160,7 +171,7 @@ void SchemaConverter::processSubtree(TraversalNode & node)
160171

161172
if (node.schema_context == SchemaContext::None)
162173
{
163-
node.appendNameComponent(node.element->name, useColumnMapperIfNeeded(*node.element));
174+
node.appendNameComponent(node.element->name, useColumnMapperIfNeeded(*node.element, node.name));
164175

165176
if (sample_block)
166177
{
@@ -589,7 +600,7 @@ void SchemaConverter::processSubtreeTuple(TraversalNode & node)
589600
std::vector<String> element_names_in_file;
590601
for (size_t i = 0; i < size_t(node.element->num_children); ++i)
591602
{
592-
const String & element_name = element_names_in_file.emplace_back(useColumnMapperIfNeeded(file_metadata.schema.at(schema_idx)));
603+
const String & element_name = element_names_in_file.emplace_back(useColumnMapperIfNeeded(file_metadata.schema.at(schema_idx), node.name));
593604
std::optional<size_t> idx_in_output_tuple = i - skipped_unsupported_columns;
594605
if (lookup_by_name)
595606
{

src/Processors/Formats/Impl/Parquet/SchemaConverter.h

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -137,8 +137,10 @@ struct SchemaConverter
137137
DataTypePtr & out_inferred_type, std::optional<GeoColumnMetadata> geo_metadata) const;
138138

139139
/// Returns element.name or a corresponding name from ColumnMapper.
140-
/// For tuple elements, that's just the element name like `x`, not the whole path like `t.x`.
141-
std::string_view useColumnMapperIfNeeded(const parq::SchemaElement & element) const;
140+
/// For nested tuple elements, returns just the element name like `x`, not the whole path like `t.x`.
141+
/// For top-level columns (when current_path is empty), returns the full mapped name to support
142+
/// column names with dots (e.g., `integer.col` in Iceberg).
143+
std::string_view useColumnMapperIfNeeded(const parq::SchemaElement & element, const String & current_path) const;
142144
};
143145

144146
}

src/Storages/ColumnsDescription.cpp

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -469,6 +469,15 @@ NamesAndTypesList ColumnsDescription::getInsertable() const
469469
return ret;
470470
}
471471

472+
NamesAndTypesList ColumnsDescription::getReadable() const
473+
{
474+
NamesAndTypesList ret;
475+
for (const auto & col : columns)
476+
if (col.default_desc.kind != ColumnDefaultKind::Ephemeral)
477+
ret.emplace_back(col.name, col.type);
478+
return ret;
479+
}
480+
472481
NamesAndTypesList ColumnsDescription::getMaterialized() const
473482
{
474483
NamesAndTypesList ret;
@@ -851,7 +860,6 @@ std::optional<ColumnDefault> ColumnsDescription::getDefault(const String & colum
851860
return {};
852861
}
853862

854-
855863
bool ColumnsDescription::hasCompressionCodec(const String & column_name) const
856864
{
857865
const auto it = columns.get<1>().find(column_name);

src/Storages/ColumnsDescription.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -149,6 +149,7 @@ class ColumnsDescription : public IHints<>
149149
NamesAndTypesList getOrdinary() const;
150150
NamesAndTypesList getMaterialized() const;
151151
NamesAndTypesList getInsertable() const; /// ordinary + ephemeral
152+
NamesAndTypesList getReadable() const; /// ordinary + materialized + aliases (no ephemeral)
152153
NamesAndTypesList getAliases() const;
153154
NamesAndTypesList getEphemeral() const;
154155
NamesAndTypesList getAllPhysical() const; /// ordinary + materialized.

src/Storages/MergeTree/ExportPartTask.cpp

Lines changed: 46 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,17 +4,20 @@
44
#include <Storages/MergeTree/MergeTreeData.h>
55
#include <Interpreters/Context.h>
66
#include <Interpreters/DatabaseCatalog.h>
7+
#include <Interpreters/inplaceBlockConversions.h>
78
#include <Core/Settings.h>
89
#include <Interpreters/ExpressionActions.h>
910
#include <Processors/Executors/CompletedPipelineExecutor.h>
1011
#include <Processors/QueryPlan/BuildQueryPipelineSettings.h>
1112
#include <Processors/QueryPlan/Optimizations/QueryPlanOptimizationSettings.h>
1213
#include <Processors/QueryPlan/QueryPlan.h>
14+
#include <Processors/QueryPlan/ExpressionStep.h>
1315
#include <QueryPipeline/QueryPipelineBuilder.h>
1416
#include <Common/Exception.h>
1517
#include <Common/ProfileEventsScope.h>
1618
#include <Storages/MergeTree/ExportList.h>
1719
#include <Formats/FormatFactory.h>
20+
#include <Databases/enableAllExperimentalSettings.h>
1821

1922
namespace ProfileEvents
2023
{
@@ -42,6 +45,43 @@ namespace Setting
4245
extern const SettingsUInt64 export_merge_tree_part_max_rows_per_file;
4346
}
4447

48+
namespace
49+
{
50+
void materializeSpecialColumns(
51+
const SharedHeader & header,
52+
const StorageMetadataPtr & storage_metadata,
53+
const ContextPtr & local_context,
54+
QueryPlan & plan_for_part
55+
)
56+
{
57+
const auto readable_columns = storage_metadata->getColumns().getReadable();
58+
59+
// Enable all experimental settings for default expressions
60+
// (same pattern as in IMergeTreeReader::evaluateMissingDefaults)
61+
auto context_for_defaults = Context::createCopy(local_context);
62+
enableAllExperimentalSettings(context_for_defaults);
63+
64+
auto defaults_dag = evaluateMissingDefaults(
65+
*header,
66+
readable_columns,
67+
storage_metadata->getColumns(),
68+
context_for_defaults);
69+
70+
if (defaults_dag)
71+
{
72+
/// Ensure columns are in the correct order matching readable_columns
73+
defaults_dag->removeUnusedActions(readable_columns.getNames(), false);
74+
defaults_dag->addMaterializingOutputActions(/*materialize_sparse=*/ false);
75+
76+
auto expression_step = std::make_unique<ExpressionStep>(
77+
header,
78+
std::move(*defaults_dag));
79+
expression_step->setStepDescription("Compute alias and default expressions for export");
80+
plan_for_part.addStep(std::move(expression_step));
81+
}
82+
}
83+
}
84+
4585
ExportPartTask::ExportPartTask(MergeTreeData & storage_, const MergeTreePartExportManifest & manifest_)
4686
: storage(storage_),
4787
manifest(manifest_)
@@ -58,7 +98,8 @@ bool ExportPartTask::executeStep()
5898

5999
const auto & metadata_snapshot = manifest.metadata_snapshot;
60100

61-
Names columns_to_read = metadata_snapshot->getColumns().getNamesOfPhysical();
101+
/// Read only physical columns from the part
102+
const auto columns_to_read = metadata_snapshot->getColumns().getNamesOfPhysical();
62103

63104
MergeTreeSequentialSourceType read_type = MergeTreeSequentialSourceType::Export;
64105

@@ -146,6 +187,10 @@ bool ExportPartTask::executeStep()
146187
local_context,
147188
getLogger("ExportPartition"));
148189

190+
/// We need to support exporting materialized and alias columns to object storage. For some reason, object storage engines don't support them.
191+
/// This is a hack that materializes the columns before the export so they can be exported to tables that have matching columns
192+
materializeSpecialColumns(plan_for_part.getCurrentHeader(), metadata_snapshot, local_context, plan_for_part);
193+
149194
ThreadGroupSwitcher switcher((*exports_list_entry)->thread_group, "");
150195

151196
QueryPlanOptimizationSettings optimization_settings(local_context);

src/Storages/MergeTree/MergeTreeData.cpp

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6242,7 +6242,13 @@ void MergeTreeData::exportPartToTable(
62426242
auto source_metadata_ptr = getInMemoryMetadataPtr();
62436243
auto destination_metadata_ptr = dest_storage->getInMemoryMetadataPtr();
62446244

6245-
if (destination_metadata_ptr->getColumns().getAllPhysical().sizeOfDifference(source_metadata_ptr->getColumns().getAllPhysical()))
6245+
const auto & source_columns = source_metadata_ptr->getColumns();
6246+
6247+
const auto & destination_columns = destination_metadata_ptr->getColumns();
6248+
6249+
/// compare all source readable columns with all destination insertable columns
6250+
/// this allows us to skip ephemeral columns
6251+
if (source_columns.getReadable().sizeOfDifference(destination_columns.getInsertable()))
62466252
throw Exception(ErrorCodes::INCOMPATIBLE_COLUMNS, "Tables have different structure");
62476253

62486254
if (query_to_string(source_metadata_ptr->getPartitionKeyAST()) != query_to_string(destination_metadata_ptr->getPartitionKeyAST()))

src/Storages/ObjectStorage/DataLakes/Iceberg/IcebergDataObjectInfo.cpp

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@ IcebergDataObjectInfo::IcebergDataObjectInfo(Iceberg::ManifestFileEntry data_man
4343
data_manifest_file_entry_.file_path_key.empty() ? std::nullopt : std::make_optional(data_manifest_file_entry_.file_path_key))
4444
, data_object_file_path_key(data_manifest_file_entry_.file_path_key)
4545
, underlying_format_read_schema_id(data_manifest_file_entry_.schema_id)
46+
, file_format(data_manifest_file_entry_.file_format)
4647
, sequence_number(data_manifest_file_entry_.added_sequence_number)
4748
{
4849
if (!position_deletes_objects.empty() && Poco::toUpperInPlace(data_manifest_file_entry_.file_format) != "PARQUET")
@@ -59,10 +60,11 @@ IcebergDataObjectInfo::IcebergDataObjectInfo(
5960
ObjectStoragePtr resolved_storage,
6061
const String & resolved_key)
6162
: PathWithMetadata(resolved_key, std::nullopt,
62-
data_manifest_file_entry_.file_path.empty() ? std::nullopt : std::make_optional(data_manifest_file_entry_.file_path),
63+
data_manifest_file_entry_.file_path.empty() ? std::nullopt : std::make_optional(data_manifest_file_entry_.file_path),
6364
resolved_storage)
6465
, data_object_file_path_key(data_manifest_file_entry_.file_path_key)
6566
, underlying_format_read_schema_id(data_manifest_file_entry_.schema_id)
67+
, file_format(data_manifest_file_entry_.file_format)
6668
, sequence_number(data_manifest_file_entry_.added_sequence_number)
6769
{
6870
if (!position_deletes_objects.empty() && Poco::toUpperInPlace(data_manifest_file_entry_.file_format) != "PARQUET")

src/Storages/ObjectStorage/DataLakes/Iceberg/IcebergDataObjectInfo.h

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ struct IcebergDataObjectInfo : public PathWithMetadata, std::enable_shared_from_
2222
/// It is used to filter position deletes objects by data file path.
2323
/// It is also used to create a filter for the data object in the position delete transform.
2424
explicit IcebergDataObjectInfo(Iceberg::ManifestFileEntry data_manifest_file_entry_);
25-
25+
2626
/// Sometimes data files are located outside the table location and even in a different storage.
2727
explicit IcebergDataObjectInfo(
2828
Iceberg::ManifestFileEntry data_manifest_file_entry_,
@@ -50,6 +50,7 @@ struct IcebergDataObjectInfo : public PathWithMetadata, std::enable_shared_from_
5050

5151
String data_object_file_path_key; // Full path to the data object file
5252
Int32 underlying_format_read_schema_id;
53+
String file_format; // Format of the data file (e.g., "PARQUET", "ORC", "AVRO")
5354
std::vector<Iceberg::PositionDeleteObject> position_deletes_objects;
5455
std::vector<Iceberg::ManifestFileEntry> equality_deletes_objects;
5556
Int64 sequence_number;

src/Storages/ObjectStorage/DataLakes/Iceberg/IcebergMetadata.cpp

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1077,7 +1077,7 @@ void IcebergMetadata::addDeleteTransformers(
10771077

10781078
auto [delete_storage_to_use, resolved_delete_key] = resolveObjectStorageForPath(
10791079
persistent_components.table_location, delete_file.file_path, object_storage, *secondary_storages, local_context);
1080-
1080+
10811081
PathWithMetadata delete_file_object(resolved_delete_key, std::nullopt, delete_file.file_path, delete_storage_to_use);
10821082
{
10831083
auto schema_read_buffer = createReadBuffer(delete_file_object, delete_storage_to_use, local_context, log);
@@ -1198,8 +1198,7 @@ ColumnMapperPtr IcebergMetadata::getColumnMapperForObject(ObjectInfoPtr object_i
11981198
IcebergDataObjectInfo * iceberg_object_info = dynamic_cast<IcebergDataObjectInfo *>(object_info.get());
11991199
if (!iceberg_object_info)
12001200
return nullptr;
1201-
auto configuration_ptr = configuration.lock();
1202-
if (Poco::toLower(configuration_ptr->getFormat()) != "parquet")
1201+
if (Poco::toLower(iceberg_object_info->file_format) != "parquet")
12031202
return nullptr;
12041203

12051204
return persistent_components.schema_processor->getColumnMapperById(iceberg_object_info->underlying_format_read_schema_id);

src/Storages/StorageReplicatedMergeTree.cpp

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8133,7 +8133,9 @@ void StorageReplicatedMergeTree::exportPartitionToTable(const PartitionCommand &
81338133
auto src_snapshot = getInMemoryMetadataPtr();
81348134
auto destination_snapshot = dest_storage->getInMemoryMetadataPtr();
81358135

8136-
if (destination_snapshot->getColumns().getAllPhysical().sizeOfDifference(src_snapshot->getColumns().getAllPhysical()))
8136+
/// compare all source readable columns with all destination insertable columns
8137+
/// this allows us to skip ephemeral columns
8138+
if (src_snapshot->getColumns().getReadable().sizeOfDifference(destination_snapshot->getColumns().getInsertable()))
81378139
throw Exception(ErrorCodes::INCOMPATIBLE_COLUMNS, "Tables have different structure");
81388140

81398141
if (query_to_string(src_snapshot->getPartitionKeyAST()) != query_to_string(destination_snapshot->getPartitionKeyAST()))

0 commit comments

Comments
 (0)