Skip to content

Commit 386df14

Browse files
Merge branch 'antalya-25.8' into mf_25.8_hybrid3
2 parents a789345 + d6d27ce commit 386df14

18 files changed

Lines changed: 575 additions & 23 deletions

File tree

src/Common/FailPoint.cpp

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -118,7 +118,8 @@ static struct InitFiu
118118
REGULAR(sleep_in_logs_flush) \
119119
ONCE(smt_commit_exception_before_op) \
120120
ONCE(backup_add_empty_memory_table) \
121-
REGULAR(refresh_task_stop_racing_for_running_refresh)
121+
REGULAR(refresh_task_stop_racing_for_running_refresh) \
122+
ONCE(database_iceberg_gcs)
122123

123124

124125
namespace FailPoints

src/Databases/DataLake/ICatalog.cpp

Lines changed: 15 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,10 +5,18 @@
55

66
#include <filesystem>
77

8+
#include <Common/FailPoint.h>
9+
810
namespace DB::ErrorCodes
911
{
1012
extern const int NOT_IMPLEMENTED;
1113
extern const int LOGICAL_ERROR;
14+
extern const int BAD_ARGUMENTS;
15+
}
16+
17+
namespace DB::FailPoints
18+
{
19+
extern const char database_iceberg_gcs[];
1220
}
1321

1422
namespace DataLake
@@ -49,9 +57,14 @@ StorageType parseStorageTypeFromString(const std::string & type)
4957
}
5058
if (capitalize_first_letter(storage_type_str) == "File")
5159
storage_type_str = "Local";
52-
53-
if (capitalize_first_letter(storage_type_str) == "S3a")
60+
else if (capitalize_first_letter(storage_type_str) == "S3a" || storage_type_str == "oss" || storage_type_str == "gs")
61+
{
62+
fiu_do_on(DB::FailPoints::database_iceberg_gcs,
63+
{
64+
throw DB::Exception(DB::ErrorCodes::BAD_ARGUMENTS, "Google cloud storage converts to S3");
65+
});
5466
storage_type_str = "S3";
67+
}
5568

5669
auto storage_type = magic_enum::enum_cast<StorageType>(capitalize_first_letter(storage_type_str));
5770

src/Processors/Formats/Impl/Parquet/SchemaConverter.cpp

Lines changed: 16 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -127,7 +127,7 @@ NamesAndTypesList SchemaConverter::inferSchema()
127127
return res;
128128
}
129129

130-
std::string_view SchemaConverter::useColumnMapperIfNeeded(const parq::SchemaElement & element) const
130+
std::string_view SchemaConverter::useColumnMapperIfNeeded(const parq::SchemaElement & element, const String & current_path) const
131131
{
132132
if (!column_mapper)
133133
return element.name;
@@ -142,8 +142,19 @@ std::string_view SchemaConverter::useColumnMapperIfNeeded(const parq::SchemaElem
142142
auto it = map.find(element.field_id);
143143
if (it == map.end())
144144
throw Exception(ErrorCodes::ICEBERG_SPECIFICATION_VIOLATION, "Parquet file has column {} with field_id {} that is not in datalake metadata", element.name, element.field_id);
145-
auto split = Nested::splitName(std::string_view(it->second), /*reverse=*/ true);
146-
return split.second.empty() ? split.first : split.second;
145+
146+
/// At top level (empty path), return the full mapped name. For nested
147+
/// elements, strip the parent path prefix to get the child name.
148+
if (current_path.empty())
149+
return it->second;
150+
151+
/// Strip "current_path." prefix to get child name (preserves dots in child names)
152+
std::string_view mapped = it->second;
153+
if (mapped.starts_with(current_path) && mapped.size() > current_path.size()
154+
&& mapped[current_path.size()] == '.')
155+
return mapped.substr(current_path.size() + 1);
156+
157+
return mapped;
147158
}
148159

149160
void SchemaConverter::processSubtree(TraversalNode & node)
@@ -160,7 +171,7 @@ void SchemaConverter::processSubtree(TraversalNode & node)
160171

161172
if (node.schema_context == SchemaContext::None)
162173
{
163-
node.appendNameComponent(node.element->name, useColumnMapperIfNeeded(*node.element));
174+
node.appendNameComponent(node.element->name, useColumnMapperIfNeeded(*node.element, node.name));
164175

165176
if (sample_block)
166177
{
@@ -589,7 +600,7 @@ void SchemaConverter::processSubtreeTuple(TraversalNode & node)
589600
std::vector<String> element_names_in_file;
590601
for (size_t i = 0; i < size_t(node.element->num_children); ++i)
591602
{
592-
const String & element_name = element_names_in_file.emplace_back(useColumnMapperIfNeeded(file_metadata.schema.at(schema_idx)));
603+
const String & element_name = element_names_in_file.emplace_back(useColumnMapperIfNeeded(file_metadata.schema.at(schema_idx), node.name));
593604
std::optional<size_t> idx_in_output_tuple = i - skipped_unsupported_columns;
594605
if (lookup_by_name)
595606
{

src/Processors/Formats/Impl/Parquet/SchemaConverter.h

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -137,8 +137,10 @@ struct SchemaConverter
137137
DataTypePtr & out_inferred_type, std::optional<GeoColumnMetadata> geo_metadata) const;
138138

139139
/// Returns element.name or a corresponding name from ColumnMapper.
140-
/// For tuple elements, that's just the element name like `x`, not the whole path like `t.x`.
141-
std::string_view useColumnMapperIfNeeded(const parq::SchemaElement & element) const;
140+
/// For nested tuple elements, returns just the element name like `x`, not the whole path like `t.x`.
141+
/// For top-level columns (when current_path is empty), returns the full mapped name to support
142+
/// column names with dots (e.g., `integer.col` in Iceberg).
143+
std::string_view useColumnMapperIfNeeded(const parq::SchemaElement & element, const String & current_path) const;
142144
};
143145

144146
}

src/Storages/ColumnsDescription.cpp

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -469,6 +469,15 @@ NamesAndTypesList ColumnsDescription::getInsertable() const
469469
return ret;
470470
}
471471

472+
NamesAndTypesList ColumnsDescription::getReadable() const
473+
{
474+
NamesAndTypesList ret;
475+
for (const auto & col : columns)
476+
if (col.default_desc.kind != ColumnDefaultKind::Ephemeral)
477+
ret.emplace_back(col.name, col.type);
478+
return ret;
479+
}
480+
472481
NamesAndTypesList ColumnsDescription::getMaterialized() const
473482
{
474483
NamesAndTypesList ret;
@@ -851,7 +860,6 @@ std::optional<ColumnDefault> ColumnsDescription::getDefault(const String & colum
851860
return {};
852861
}
853862

854-
855863
bool ColumnsDescription::hasCompressionCodec(const String & column_name) const
856864
{
857865
const auto it = columns.get<1>().find(column_name);

src/Storages/ColumnsDescription.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -149,6 +149,7 @@ class ColumnsDescription : public IHints<>
149149
NamesAndTypesList getOrdinary() const;
150150
NamesAndTypesList getMaterialized() const;
151151
NamesAndTypesList getInsertable() const; /// ordinary + ephemeral
152+
NamesAndTypesList getReadable() const; /// ordinary + materialized + aliases (no ephemeral)
152153
NamesAndTypesList getAliases() const;
153154
NamesAndTypesList getEphemeral() const;
154155
NamesAndTypesList getAllPhysical() const; /// ordinary + materialized.

src/Storages/MergeTree/ExportPartTask.cpp

Lines changed: 46 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,17 +4,20 @@
44
#include <Storages/MergeTree/MergeTreeData.h>
55
#include <Interpreters/Context.h>
66
#include <Interpreters/DatabaseCatalog.h>
7+
#include <Interpreters/inplaceBlockConversions.h>
78
#include <Core/Settings.h>
89
#include <Interpreters/ExpressionActions.h>
910
#include <Processors/Executors/CompletedPipelineExecutor.h>
1011
#include <Processors/QueryPlan/BuildQueryPipelineSettings.h>
1112
#include <Processors/QueryPlan/Optimizations/QueryPlanOptimizationSettings.h>
1213
#include <Processors/QueryPlan/QueryPlan.h>
14+
#include <Processors/QueryPlan/ExpressionStep.h>
1315
#include <QueryPipeline/QueryPipelineBuilder.h>
1416
#include <Common/Exception.h>
1517
#include <Common/ProfileEventsScope.h>
1618
#include <Storages/MergeTree/ExportList.h>
1719
#include <Formats/FormatFactory.h>
20+
#include <Databases/enableAllExperimentalSettings.h>
1821

1922
namespace ProfileEvents
2023
{
@@ -42,6 +45,43 @@ namespace Setting
4245
extern const SettingsUInt64 export_merge_tree_part_max_rows_per_file;
4346
}
4447

48+
namespace
49+
{
50+
void materializeSpecialColumns(
51+
const SharedHeader & header,
52+
const StorageMetadataPtr & storage_metadata,
53+
const ContextPtr & local_context,
54+
QueryPlan & plan_for_part
55+
)
56+
{
57+
const auto readable_columns = storage_metadata->getColumns().getReadable();
58+
59+
// Enable all experimental settings for default expressions
60+
// (same pattern as in IMergeTreeReader::evaluateMissingDefaults)
61+
auto context_for_defaults = Context::createCopy(local_context);
62+
enableAllExperimentalSettings(context_for_defaults);
63+
64+
auto defaults_dag = evaluateMissingDefaults(
65+
*header,
66+
readable_columns,
67+
storage_metadata->getColumns(),
68+
context_for_defaults);
69+
70+
if (defaults_dag)
71+
{
72+
/// Ensure columns are in the correct order matching readable_columns
73+
defaults_dag->removeUnusedActions(readable_columns.getNames(), false);
74+
defaults_dag->addMaterializingOutputActions(/*materialize_sparse=*/ false);
75+
76+
auto expression_step = std::make_unique<ExpressionStep>(
77+
header,
78+
std::move(*defaults_dag));
79+
expression_step->setStepDescription("Compute alias and default expressions for export");
80+
plan_for_part.addStep(std::move(expression_step));
81+
}
82+
}
83+
}
84+
4585
ExportPartTask::ExportPartTask(MergeTreeData & storage_, const MergeTreePartExportManifest & manifest_)
4686
: storage(storage_),
4787
manifest(manifest_)
@@ -58,7 +98,8 @@ bool ExportPartTask::executeStep()
5898

5999
const auto & metadata_snapshot = manifest.metadata_snapshot;
60100

61-
Names columns_to_read = metadata_snapshot->getColumns().getNamesOfPhysical();
101+
/// Read only physical columns from the part
102+
const auto columns_to_read = metadata_snapshot->getColumns().getNamesOfPhysical();
62103

63104
MergeTreeSequentialSourceType read_type = MergeTreeSequentialSourceType::Export;
64105

@@ -146,6 +187,10 @@ bool ExportPartTask::executeStep()
146187
local_context,
147188
getLogger("ExportPartition"));
148189

190+
/// We need to support exporting materialized and alias columns to object storage. For some reason, object storage engines don't support them.
191+
/// This is a hack that materializes the columns before the export so they can be exported to tables that have matching columns
192+
materializeSpecialColumns(plan_for_part.getCurrentHeader(), metadata_snapshot, local_context, plan_for_part);
193+
149194
ThreadGroupSwitcher switcher((*exports_list_entry)->thread_group, "");
150195

151196
QueryPlanOptimizationSettings optimization_settings(local_context);

src/Storages/MergeTree/MergeTreeData.cpp

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6242,7 +6242,13 @@ void MergeTreeData::exportPartToTable(
62426242
auto source_metadata_ptr = getInMemoryMetadataPtr();
62436243
auto destination_metadata_ptr = dest_storage->getInMemoryMetadataPtr();
62446244

6245-
if (destination_metadata_ptr->getColumns().getAllPhysical().sizeOfDifference(source_metadata_ptr->getColumns().getAllPhysical()))
6245+
const auto & source_columns = source_metadata_ptr->getColumns();
6246+
6247+
const auto & destination_columns = destination_metadata_ptr->getColumns();
6248+
6249+
/// compare all source readable columns with all destination insertable columns
6250+
/// this allows us to skip ephemeral columns
6251+
if (source_columns.getReadable().sizeOfDifference(destination_columns.getInsertable()))
62466252
throw Exception(ErrorCodes::INCOMPATIBLE_COLUMNS, "Tables have different structure");
62476253

62486254
if (query_to_string(source_metadata_ptr->getPartitionKeyAST()) != query_to_string(destination_metadata_ptr->getPartitionKeyAST()))

src/Storages/ObjectStorage/DataLakes/Iceberg/IcebergDataObjectInfo.cpp

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@ IcebergDataObjectInfo::IcebergDataObjectInfo(Iceberg::ManifestFileEntry data_man
4343
data_manifest_file_entry_.file_path_key.empty() ? std::nullopt : std::make_optional(data_manifest_file_entry_.file_path_key))
4444
, data_object_file_path_key(data_manifest_file_entry_.file_path_key)
4545
, underlying_format_read_schema_id(data_manifest_file_entry_.schema_id)
46+
, file_format(data_manifest_file_entry_.file_format)
4647
, sequence_number(data_manifest_file_entry_.added_sequence_number)
4748
{
4849
if (!position_deletes_objects.empty() && Poco::toUpperInPlace(data_manifest_file_entry_.file_format) != "PARQUET")
@@ -59,10 +60,11 @@ IcebergDataObjectInfo::IcebergDataObjectInfo(
5960
ObjectStoragePtr resolved_storage,
6061
const String & resolved_key)
6162
: PathWithMetadata(resolved_key, std::nullopt,
62-
data_manifest_file_entry_.file_path.empty() ? std::nullopt : std::make_optional(data_manifest_file_entry_.file_path),
63+
data_manifest_file_entry_.file_path.empty() ? std::nullopt : std::make_optional(data_manifest_file_entry_.file_path),
6364
resolved_storage)
6465
, data_object_file_path_key(data_manifest_file_entry_.file_path_key)
6566
, underlying_format_read_schema_id(data_manifest_file_entry_.schema_id)
67+
, file_format(data_manifest_file_entry_.file_format)
6668
, sequence_number(data_manifest_file_entry_.added_sequence_number)
6769
{
6870
if (!position_deletes_objects.empty() && Poco::toUpperInPlace(data_manifest_file_entry_.file_format) != "PARQUET")

src/Storages/ObjectStorage/DataLakes/Iceberg/IcebergDataObjectInfo.h

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ struct IcebergDataObjectInfo : public PathWithMetadata, std::enable_shared_from_
2222
/// It is used to filter position deletes objects by data file path.
2323
/// It is also used to create a filter for the data object in the position delete transform.
2424
explicit IcebergDataObjectInfo(Iceberg::ManifestFileEntry data_manifest_file_entry_);
25-
25+
2626
/// Sometimes data files are located outside the table location and even in a different storage.
2727
explicit IcebergDataObjectInfo(
2828
Iceberg::ManifestFileEntry data_manifest_file_entry_,
@@ -50,6 +50,7 @@ struct IcebergDataObjectInfo : public PathWithMetadata, std::enable_shared_from_
5050

5151
String data_object_file_path_key; // Full path to the data object file
5252
Int32 underlying_format_read_schema_id;
53+
String file_format; // Format of the data file (e.g., "PARQUET", "ORC", "AVRO")
5354
std::vector<Iceberg::PositionDeleteObject> position_deletes_objects;
5455
std::vector<Iceberg::ManifestFileEntry> equality_deletes_objects;
5556
Int64 sequence_number;

0 commit comments

Comments
 (0)