Skip to content

Commit 088fb03

Browse files
Merge branch 'antalya-25.8' into qa/update-broken-tests
2 parents 155db9c + bd67f68 commit 088fb03

15 files changed

Lines changed: 747 additions & 29 deletions

File tree

docs/en/engines/table-engines/mergetree-family/part_export.md

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,18 @@ Source and destination tables must be 100% compatible:
6060
- **Default**: `0`
6161
- **Description**: Maximum number of rows to write to a single file when exporting a merge tree part. 0 means no limit. This is not a hard limit, and it highly depends on the output format granularity and input source chunk size. Using this might break idempotency, use it with care.
6262

63+
### export_merge_tree_part_throw_on_pending_mutations
64+
65+
- **Type**: `bool`
66+
- **Default**: `true`
67+
- **Description**: If set to true, throws if pending mutations exists for a given part. Note that by default mutations are applied to all parts, which means that if a mutation in practice would only affetct part/partition x, all the other parts/partition will throw upon export. The exception is when the `IN PARTITION` clause was used in the mutation command. Note the `IN PARTITION` clause is not properly implemented for plain MergeTree tables.
68+
69+
### export_merge_tree_part_throw_on_pending_patch_parts
70+
71+
- **Type**: `bool`
72+
- **Default**: `true`
73+
- **Description**: If set to true, throws if pending patch parts exists for a given part. Note that by default mutations are applied to all parts, which means that if a mutation in practice would only affetct part/partition x, all the other parts/partition will throw upon export. The exception is when the `IN PARTITION` clause was used in the mutation command. Note the `IN PARTITION` clause is not properly implemented for plain MergeTree tables.
74+
6375
## Examples
6476

6577
### Basic Export to S3

docs/en/engines/table-engines/mergetree-family/partition_export.md

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,18 @@ TO TABLE [destination_database.]destination_table
7070
- `error` - Throw an error if the file already exists
7171
- `overwrite` - Overwrite the file
7272

73+
### export_merge_tree_part_throw_on_pending_mutations
74+
75+
- **Type**: `bool`
76+
- **Default**: `true`
77+
- **Description**: If set to true, throws if pending mutations exists for a given part. Note that by default mutations are applied to all parts, which means that if a mutation in practice would only affetct part/partition x, all the other parts/partition will throw upon export. The exception is when the `IN PARTITION` clause was used in the mutation command. Note the `IN PARTITION` clause is not properly implemented for plain MergeTree tables.
78+
79+
### export_merge_tree_part_throw_on_pending_patch_parts
80+
81+
- **Type**: `bool`
82+
- **Default**: `true`
83+
- **Description**: If set to true, throws if pending patch parts exists for a given part. Note that by default mutations are applied to all parts, which means that if a mutation in practice would only affetct part/partition x, all the other parts/partition will throw upon export. The exception is when the `IN PARTITION` clause was used in the mutation command. Note the `IN PARTITION` clause is not properly implemented for plain MergeTree tables.
84+
7385
## Examples
7486

7587
### Basic Export to S3

src/Common/ErrorCodes.cpp

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -648,6 +648,7 @@
648648
M(1002, UNKNOWN_EXCEPTION) \
649649
M(1003, SSH_EXCEPTION) \
650650
M(1004, STARTUP_SCRIPTS_ERROR) \
651+
M(1005, PENDING_MUTATIONS_NOT_ALLOWED) \
651652
/* See END */
652653

653654
#ifdef APPLY_FOR_EXTERNAL_ERROR_CODES
@@ -664,7 +665,7 @@ namespace ErrorCodes
664665
APPLY_FOR_ERROR_CODES(M)
665666
#undef M
666667

667-
constexpr ErrorCode END = 1004;
668+
constexpr ErrorCode END = 1005;
668669
ErrorPairHolder values[END + 1]{};
669670

670671
struct ErrorCodesNames

src/Core/Settings.cpp

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6920,6 +6920,12 @@ This is not a hard limit, and it highly depends on the output format granularity
69206920
DECLARE(UInt64, export_merge_tree_part_max_rows_per_file, 0, R"(
69216921
Maximum number of rows to write to a single file when exporting a merge tree part. 0 means no limit.
69226922
This is not a hard limit, and it highly depends on the output format granularity and input source chunk size.
6923+
)", 0) \
6924+
DECLARE(Bool, export_merge_tree_part_throw_on_pending_mutations, true, R"(
6925+
Throw an error if there are pending mutations when exporting a merge tree part.
6926+
)", 0) \
6927+
DECLARE(Bool, export_merge_tree_part_throw_on_pending_patch_parts, true, R"(
6928+
Throw an error if there are pending patch parts when exporting a merge tree part.
69236929
)", 0) \
69246930
DECLARE(Bool, serialize_string_in_memory_with_zero_byte, true, R"(
69256931
Serialize String values during aggregation with zero byte at the end. Enable to keep compatibility when querying cluster of incompatible versions.

src/Core/SettingsChangesHistory.cpp

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,8 @@ const VersionToSettingsChangesMap & getSettingsChangesHistory()
6464
{"allow_experimental_hybrid_table", false, false, "Added new setting to allow the Hybrid table engine."},
6565
{"cluster_table_function_split_granularity", "file", "file", "New setting."},
6666
{"cluster_table_function_buckets_batch_size", 0, 0, "New setting."},
67+
{"export_merge_tree_part_throw_on_pending_mutations", true, true, "New setting."},
68+
{"export_merge_tree_part_throw_on_pending_patch_parts", true, true, "New setting."},
6769
});
6870
addSettingsChanges(settings_changes_history, "25.8",
6971
{

src/Storages/MergeTree/ExportPartitionTaskScheduler.cpp

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,11 @@ namespace
3030
context_copy->setSetting("export_merge_tree_part_file_already_exists_policy", String(magic_enum::enum_name(manifest.file_already_exists_policy)));
3131
context_copy->setSetting("export_merge_tree_part_max_bytes_per_file", manifest.max_bytes_per_file);
3232
context_copy->setSetting("export_merge_tree_part_max_rows_per_file", manifest.max_rows_per_file);
33+
34+
/// always skip pending mutations and patch parts because we already validated the parts during query processing
35+
context_copy->setSetting("export_merge_tree_part_throw_on_pending_mutations", false);
36+
context_copy->setSetting("export_merge_tree_part_throw_on_pending_patch_parts", false);
37+
3338
return context_copy;
3439
}
3540
}
@@ -144,6 +149,7 @@ void ExportPartitionTaskScheduler::run()
144149
destination_storage_id,
145150
manifest.transaction_id,
146151
getContextCopyWithTaskSettings(storage.getContext(), manifest),
152+
/*allow_outdated_parts*/ true,
147153
[this, key, zk_part_name, manifest, destination_storage]
148154
(MergeTreePartExportManifest::CompletionCallbackResult result)
149155
{

src/Storages/MergeTree/MergeTreeData.cpp

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -217,6 +217,8 @@ namespace Setting
217217
extern const SettingsMergeTreePartExportFileAlreadyExistsPolicy export_merge_tree_part_file_already_exists_policy;
218218
extern const SettingsBool output_format_parallel_formatting;
219219
extern const SettingsBool output_format_parquet_parallel_encoding;
220+
extern const SettingsBool export_merge_tree_part_throw_on_pending_mutations;
221+
extern const SettingsBool export_merge_tree_part_throw_on_pending_patch_parts;
220222
}
221223

222224
namespace MergeTreeSetting
@@ -336,6 +338,7 @@ namespace ErrorCodes
336338
extern const int TOO_LARGE_LIGHTWEIGHT_UPDATES;
337339
extern const int UNKNOWN_TABLE;
338340
extern const int FILE_ALREADY_EXISTS;
341+
extern const int PENDING_MUTATIONS_NOT_ALLOWED;
339342
}
340343

341344
static void checkSuspiciousIndices(const ASTFunction * index_function)
@@ -6222,6 +6225,7 @@ void MergeTreeData::exportPartToTable(
62226225
const StorageID & destination_storage_id,
62236226
const String & transaction_id,
62246227
ContextPtr query_context,
6228+
bool allow_outdated_parts,
62256229
std::function<void(MergeTreePartExportManifest::CompletionCallbackResult)> completion_callback)
62266230
{
62276231
auto dest_storage = DatabaseCatalog::instance().getTable(destination_storage_id, query_context);
@@ -6260,6 +6264,43 @@ void MergeTreeData::exportPartToTable(
62606264
throw Exception(ErrorCodes::NO_SUCH_DATA_PART, "No such data part '{}' to export in table '{}'",
62616265
part_name, getStorageID().getFullTableName());
62626266

6267+
if (part->getState() == MergeTreeDataPartState::Outdated && !allow_outdated_parts)
6268+
throw Exception(
6269+
ErrorCodes::BAD_ARGUMENTS,
6270+
"Part {} is in the outdated state and cannot be exported",
6271+
part_name);
6272+
6273+
const bool throw_on_pending_mutations = query_context->getSettingsRef()[Setting::export_merge_tree_part_throw_on_pending_mutations];
6274+
const bool throw_on_pending_patch_parts = query_context->getSettingsRef()[Setting::export_merge_tree_part_throw_on_pending_patch_parts];
6275+
6276+
MergeTreeData::IMutationsSnapshot::Params mutations_snapshot_params
6277+
{
6278+
.metadata_version = source_metadata_ptr->getMetadataVersion(),
6279+
.min_part_metadata_version = part->getMetadataVersion(),
6280+
.need_data_mutations = throw_on_pending_mutations,
6281+
.need_alter_mutations = throw_on_pending_mutations || throw_on_pending_patch_parts,
6282+
.need_patch_parts = throw_on_pending_patch_parts,
6283+
};
6284+
6285+
const auto mutations_snapshot = getMutationsSnapshot(mutations_snapshot_params);
6286+
6287+
const auto alter_conversions = getAlterConversionsForPart(part, mutations_snapshot, query_context);
6288+
6289+
/// re-check `throw_on_pending_mutations` because `pending_mutations` might have been filled due to `throw_on_pending_patch_parts`
6290+
if (throw_on_pending_mutations && alter_conversions->hasMutations())
6291+
{
6292+
throw Exception(ErrorCodes::PENDING_MUTATIONS_NOT_ALLOWED,
6293+
"Part {} can not be exported because there are pending mutations. Either wait for the mutations to be applied or set `export_merge_tree_part_throw_on_pending_mutations` to false",
6294+
part_name);
6295+
}
6296+
6297+
if (alter_conversions->hasPatches())
6298+
{
6299+
throw Exception(ErrorCodes::PENDING_MUTATIONS_NOT_ALLOWED,
6300+
"Part {} can not be exported because there are pending patch parts. Either wait for the patch parts to be applied or set `export_merge_tree_part_throw_on_pending_patch_parts` to false",
6301+
part_name);
6302+
}
6303+
62636304
{
62646305
const auto format_settings = getFormatSettings(query_context);
62656306
MergeTreePartExportManifest manifest(

src/Storages/MergeTree/MergeTreeData.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -989,6 +989,7 @@ class MergeTreeData : public IStorage, public WithMutableContext
989989
const StorageID & destination_storage_id,
990990
const String & transaction_id,
991991
ContextPtr query_context,
992+
bool allow_outdated_parts = false,
992993
std::function<void(MergeTreePartExportManifest::CompletionCallbackResult)> completion_callback = {});
993994

994995
void killExportPart(const String & transaction_id);

src/Storages/StorageMerge.cpp

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,7 @@ namespace DB
6868
namespace Setting
6969
{
7070
extern const SettingsBool allow_experimental_analyzer;
71+
extern const SettingsBool distributed_aggregation_memory_efficient;
7172
extern const SettingsSeconds lock_acquire_timeout;
7273
extern const SettingsFloat max_streams_multiplier_for_merge_tables;
7374
extern const SettingsUInt64 merge_table_max_tables_to_look_for_schema_inference;
@@ -550,16 +551,20 @@ void ReadFromMerge::initializePipeline(QueryPipelineBuilder & pipeline, const Bu
550551

551552
pipeline = QueryPipelineBuilder::unitePipelines(std::move(pipelines));
552553

553-
if (!query_info.input_order_info)
554+
// It's possible to have many tables read from merge, resize(num_streams) might open too many files at the same time.
555+
// Using narrowPipe instead. But in case of reading in order of primary key, we cannot do it,
556+
// because narrowPipe doesn't preserve order. Also, if we are doing a memory efficient distributed agggregation, bucket
557+
// order must be preserved.
558+
const bool should_not_narrow = query_info.input_order_info || (
559+
context->getSettingsRef()[Setting::distributed_aggregation_memory_efficient]
560+
&& common_processed_stage == QueryProcessingStage::Enum::WithMergeableState);
561+
if (!should_not_narrow)
554562
{
555563
size_t tables_count = selected_tables.size();
556564
Float64 num_streams_multiplier = std::min(
557565
tables_count, std::max(1UL, static_cast<size_t>(context->getSettingsRef()[Setting::max_streams_multiplier_for_merge_tables])));
558566
size_t num_streams = static_cast<size_t>(requested_num_streams * num_streams_multiplier);
559567

560-
// It's possible to have many tables read from merge, resize(num_streams) might open too many files at the same time.
561-
// Using narrowPipe instead. But in case of reading in order of primary key, we cannot do it,
562-
// because narrowPipe doesn't preserve order.
563568
pipeline.narrow(num_streams);
564569
}
565570

src/Storages/StorageReplicatedMergeTree.cpp

Lines changed: 41 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -202,6 +202,8 @@ namespace Setting
202202
extern const SettingsMergeTreePartExportFileAlreadyExistsPolicy export_merge_tree_part_file_already_exists_policy;
203203
extern const SettingsUInt64 export_merge_tree_part_max_bytes_per_file;
204204
extern const SettingsUInt64 export_merge_tree_part_max_rows_per_file;
205+
extern const SettingsBool export_merge_tree_part_throw_on_pending_mutations;
206+
extern const SettingsBool export_merge_tree_part_throw_on_pending_patch_parts;
205207
}
206208

207209
namespace MergeTreeSetting
@@ -305,6 +307,7 @@ namespace ErrorCodes
305307
extern const int CANNOT_FORGET_PARTITION;
306308
extern const int TIMEOUT_EXCEEDED;
307309
extern const int INVALID_SETTING_VALUE;
310+
extern const int PENDING_MUTATIONS_NOT_ALLOWED;
308311
}
309312

310313
namespace ServerSetting
@@ -8194,18 +8197,54 @@ void StorageReplicatedMergeTree::exportPartitionToTable(const PartitionCommand &
81948197

81958198
ops.emplace_back(zkutil::makeCreateRequest(partition_exports_path, "", zkutil::CreateMode::Persistent));
81968199

8197-
auto data_parts_lock = lockParts();
8200+
DataPartsVector parts;
81988201

8199-
const auto parts = getDataPartsVectorInPartitionForInternalUsage(MergeTreeDataPartState::Active, partition_id, &data_parts_lock);
8202+
{
8203+
auto data_parts_lock = lockParts();
8204+
parts = getDataPartsVectorInPartitionForInternalUsage(MergeTreeDataPartState::Active, partition_id, &data_parts_lock);
8205+
}
82008206

82018207
if (parts.empty())
82028208
{
82038209
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Partition {} doesn't exist", partition_id);
82048210
}
82058211

8212+
const bool throw_on_pending_mutations = query_context->getSettingsRef()[Setting::export_merge_tree_part_throw_on_pending_mutations];
8213+
const bool throw_on_pending_patch_parts = query_context->getSettingsRef()[Setting::export_merge_tree_part_throw_on_pending_patch_parts];
8214+
8215+
MergeTreeData::IMutationsSnapshot::Params mutations_snapshot_params
8216+
{
8217+
.metadata_version = getInMemoryMetadataPtr()->getMetadataVersion(),
8218+
.min_part_metadata_version = MergeTreeData::getMinMetadataVersion(parts),
8219+
.need_data_mutations = throw_on_pending_mutations,
8220+
.need_alter_mutations = throw_on_pending_mutations || throw_on_pending_patch_parts,
8221+
.need_patch_parts = throw_on_pending_patch_parts,
8222+
};
8223+
8224+
const auto mutations_snapshot = getMutationsSnapshot(mutations_snapshot_params);
8225+
82068226
std::vector<String> part_names;
82078227
for (const auto & part : parts)
82088228
{
8229+
const auto alter_conversions = getAlterConversionsForPart(part, mutations_snapshot, query_context);
8230+
8231+
/// re-check `throw_on_pending_mutations` because `pending_mutations` might have been filled due to `throw_on_pending_patch_parts`
8232+
if (alter_conversions->hasMutations() && throw_on_pending_mutations)
8233+
{
8234+
throw Exception(ErrorCodes::PENDING_MUTATIONS_NOT_ALLOWED,
8235+
"Partition {} can not be exported because the part {} has pending mutations. Either wait for the mutations to be applied or set `export_merge_tree_part_throw_on_pending_mutations` to false",
8236+
partition_id,
8237+
part->name);
8238+
}
8239+
8240+
if (alter_conversions->hasPatches())
8241+
{
8242+
throw Exception(ErrorCodes::PENDING_MUTATIONS_NOT_ALLOWED,
8243+
"Partition {} can not be exported because the part {} has pending patch parts. Either wait for the patch parts to be applied or set `export_merge_tree_part_throw_on_pending_patch_parts` to false",
8244+
partition_id,
8245+
part->name);
8246+
}
8247+
82098248
part_names.push_back(part->name);
82108249
}
82118250

0 commit comments

Comments
 (0)