Skip to content

Commit 7d2a906

Browse files
authored
Merge branch '25.5' into backport/25.5/85904
2 parents 6037908 + 0095286 commit 7d2a906

35 files changed

Lines changed: 604 additions & 99 deletions

cmake/autogenerated_versions.txt

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -2,11 +2,11 @@
22

33
# NOTE: VERSION_REVISION has nothing common with DBMS_TCP_PROTOCOL_VERSION,
44
# only DBMS_TCP_PROTOCOL_VERSION should be incremented on protocol changes.
5-
SET(VERSION_REVISION 54507)
5+
SET(VERSION_REVISION 54508)
66
SET(VERSION_MAJOR 25)
77
SET(VERSION_MINOR 5)
8-
SET(VERSION_PATCH 10)
9-
SET(VERSION_GITHASH 3ff229371ccedb06146225a6ec5906241d58e04a)
10-
SET(VERSION_DESCRIBE v25.5.10.1-stable)
11-
SET(VERSION_STRING 25.5.10.1)
8+
SET(VERSION_PATCH 11)
9+
SET(VERSION_GITHASH e54ec61bcfe3bdb439f17eebc9725b9ea5c60f9b)
10+
SET(VERSION_DESCRIBE v25.5.11.1-stable)
11+
SET(VERSION_STRING 25.5.11.1)
1212
# end of autochange

src/Access/IAccessStorage.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -567,7 +567,7 @@ std::optional<AuthResult> IAccessStorage::authenticateImpl(
567567

568568
if (skipped_not_allowed_authentication_methods)
569569
{
570-
LOG_INFO(log, "Skipped the check for not allowed authentication methods,"
570+
LOG_INFO(getLogger(), "Skipped the check for not allowed authentication methods,"
571571
"check allow_no_password and allow_plaintext_password settings in the server configuration");
572572
}
573573

src/Columns/ColumnDynamic.cpp

Lines changed: 27 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -1021,54 +1021,49 @@ ColumnPtr ColumnDynamic::compress(bool force_compression) const
10211021
});
10221022
}
10231023

1024-
void ColumnDynamic::updateCheckpoint(ColumnCheckpoint & checkpoint) const
1024+
ColumnCheckpointPtr ColumnDynamic::getCheckpoint() const
10251025
{
1026-
auto & nested = assert_cast<ColumnCheckpointWithMultipleNested &>(checkpoint).nested;
1027-
const auto & variants = variant_column_ptr->getVariants();
1028-
1029-
size_t old_size = nested.size();
1030-
chassert(old_size <= variants.size());
1026+
std::unordered_map<String, ColumnCheckpointPtr> variants_checkpoints;
1027+
for (const auto & [name, discr] : variant_info.variant_name_to_discriminator)
1028+
variants_checkpoints[name] = variant_column_ptr->getVariantByGlobalDiscriminator(discr).getCheckpoint();
1029+
return std::make_shared<DynamicColumnCheckpoint>(size(), variants_checkpoints);
1030+
}
10311031

1032-
for (size_t i = 0; i < old_size; ++i)
1033-
{
1034-
variants[i]->updateCheckpoint(*nested[i]);
1035-
}
1032+
void ColumnDynamic::updateCheckpoint(ColumnCheckpoint & checkpoint) const
1033+
{
1034+
auto & variants_checkpoints = assert_cast<DynamicColumnCheckpoint &>(checkpoint).variants_checkpoints;
10361035

1037-
/// If column has new variants since last checkpoint create checkpoints for them.
1038-
if (old_size < variants.size())
1036+
for (const auto & [name, discr] : variant_info.variant_name_to_discriminator)
10391037
{
1040-
nested.resize(variants.size());
1041-
for (size_t i = old_size; i < variants.size(); ++i)
1042-
nested[i] = variants[i]->getCheckpoint();
1038+
auto it = variants_checkpoints.find(name);
1039+
/// If column has new variants since last checkpoint create checkpoints for them.
1040+
if (it == variants_checkpoints.end())
1041+
variants_checkpoints.emplace(name, variant_column_ptr->getVariantByGlobalDiscriminator(discr).getCheckpoint());
1042+
else
1043+
variant_column_ptr->getVariantByGlobalDiscriminator(discr).updateCheckpoint(*it->second);
10431044
}
10441045

10451046
checkpoint.size = size();
10461047
}
10471048

10481049
void ColumnDynamic::rollback(const ColumnCheckpoint & checkpoint)
10491050
{
1050-
const auto & nested = assert_cast<const ColumnCheckpointWithMultipleNested &>(checkpoint).nested;
1051-
chassert(nested.size() <= variant_column_ptr->getNumVariants());
1052-
1053-
/// The structure hasn't changed, so we can use generic rollback of Variant column
1054-
if (nested.size() == variant_column_ptr->getNumVariants())
1055-
{
1056-
variant_column_ptr->rollback(checkpoint);
1057-
return;
1058-
}
1051+
const auto & variants_checkpoints = assert_cast<const DynamicColumnCheckpoint &>(checkpoint).variants_checkpoints;
10591052

10601053
/// Manually rollback internals of Variant column
10611054
variant_column_ptr->getOffsets().resize_assume_reserved(checkpoint.size);
10621055
variant_column_ptr->getLocalDiscriminators().resize_assume_reserved(checkpoint.size);
10631056

1064-
auto & variants = variant_column_ptr->getVariants();
1065-
for (size_t i = 0; i < nested.size(); ++i)
1066-
variants[i]->rollback(*nested[i]);
1067-
1068-
/// Keep the structure of variant as is but rollback
1069-
/// to 0 variants that are not in the checkpoint.
1070-
for (size_t i = nested.size(); i < variants.size(); ++i)
1071-
variants[i] = variants[i]->cloneEmpty();
1057+
for (const auto & [name, discr] : variant_info.variant_name_to_discriminator)
1058+
{
1059+
auto it = variants_checkpoints.find(name);
1060+
/// Keep the structure of variant as is but rollback
1061+
/// to 0 variants that are not in the checkpoint.
1062+
if (it == variants_checkpoints.end())
1063+
variant_column_ptr->getVariantPtrByGlobalDiscriminator(discr) = variant_column_ptr->getVariantPtrByGlobalDiscriminator(discr)->cloneEmpty();
1064+
else
1065+
variant_column_ptr->getVariantByGlobalDiscriminator(discr).rollback(*it->second);
1066+
}
10721067
}
10731068

10741069
String ColumnDynamic::getTypeNameAt(size_t row_num) const

src/Columns/ColumnDynamic.h

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -306,10 +306,7 @@ class ColumnDynamic final : public COWHelper<IColumnHelper<ColumnDynamic>, Colum
306306
variant_column_ptr->protect();
307307
}
308308

309-
ColumnCheckpointPtr getCheckpoint() const override
310-
{
311-
return variant_column_ptr->getCheckpoint();
312-
}
309+
ColumnCheckpointPtr getCheckpoint() const override;
313310

314311
void updateCheckpoint(ColumnCheckpoint & checkpoint) const override;
315312

@@ -501,6 +498,16 @@ class ColumnDynamic final : public COWHelper<IColumnHelper<ColumnDynamic>, Colum
501498
std::unordered_map<String, SerializationPtr> serialization_cache;
502499
};
503500

501+
struct DynamicColumnCheckpoint : public ColumnCheckpoint
502+
{
503+
DynamicColumnCheckpoint(size_t size_, std::unordered_map<String, ColumnCheckpointPtr> variants_checkpoints_) : ColumnCheckpoint(size_), variants_checkpoints(variants_checkpoints_)
504+
{
505+
}
506+
507+
std::unordered_map<String, ColumnCheckpointPtr> variants_checkpoints;
508+
};
509+
510+
504511
void extendVariantColumn(
505512
IColumn & variant_column,
506513
const DataTypePtr & old_variant_type,

src/Columns/tests/gtest_column_dynamic.cpp

Lines changed: 24 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -937,55 +937,60 @@ TEST(ColumnDynamic, rollback)
937937
ASSERT_EQ(num_rows, column_variant.size());
938938
};
939939

940-
auto check_checkpoint = [&](const ColumnCheckpoint & cp, std::vector<size_t> sizes)
940+
auto check_checkpoint = [&](const ColumnCheckpoint & cp, std::unordered_map<String, size_t> sizes)
941941
{
942-
const auto & nested = assert_cast<const ColumnCheckpointWithMultipleNested &>(cp).nested;
942+
const auto & variants_checkpoints = assert_cast<const DynamicColumnCheckpoint &>(cp).variants_checkpoints;
943943
size_t num_rows = 0;
944944

945-
for (size_t i = 0; i < nested.size(); ++i)
945+
for (const auto & [variant, checkpoint] : variants_checkpoints)
946946
{
947-
ASSERT_EQ(nested[i]->size, sizes[i]);
948-
num_rows += sizes[i];
947+
ASSERT_EQ(checkpoint->size, sizes.at(variant));
948+
num_rows += sizes.at(variant);
949949
}
950950

951951
ASSERT_EQ(num_rows, cp.size);
952952
};
953953

954-
std::vector<std::pair<ColumnCheckpointPtr, std::vector<size_t>>> checkpoints;
954+
std::vector<std::vector<size_t>> variant_checkpoints_sizes;
955+
std::vector<std::pair<ColumnCheckpointPtr, std::unordered_map<String, size_t>>> dynamic_checkpoints;
955956

956957
auto column = ColumnDynamic::create(2);
957958
auto checkpoint = column->getCheckpoint();
958959

959960
column->insert(Field(42));
960-
961961
column->updateCheckpoint(*checkpoint);
962-
checkpoints.emplace_back(checkpoint, std::vector<size_t>{0, 1, 0});
963-
964962
column->insert(Field("str1"));
965963
column->rollback(*checkpoint);
966964

967-
check_checkpoint(*checkpoint, checkpoints.back().second);
968-
check_variant(column->getVariantColumn(), checkpoints.back().second);
965+
variant_checkpoints_sizes.emplace_back(std::vector<size_t>{0, 1, 0});
966+
dynamic_checkpoints.emplace_back(checkpoint, std::unordered_map<String, size_t>{{"SharedVariant", 0}, {"Int8", 1}, {"String", 0}});
967+
968+
check_checkpoint(*checkpoint, dynamic_checkpoints.back().second);
969+
check_variant(column->getVariantColumn(), variant_checkpoints_sizes.back());
969970

970971
column->insert("str1");
971-
checkpoints.emplace_back(column->getCheckpoint(), std::vector<size_t>{0, 1, 1});
972+
variant_checkpoints_sizes.emplace_back(std::vector<size_t>{0, 1, 1});
973+
dynamic_checkpoints.emplace_back(column->getCheckpoint(), std::unordered_map<String, size_t>{{"SharedVariant", 0}, {"Int8", 1}, {"String", 1}});
972974

973975
column->insert("str2");
974-
checkpoints.emplace_back(column->getCheckpoint(), std::vector<size_t>{0, 1, 2});
976+
variant_checkpoints_sizes.emplace_back(std::vector<size_t>{0, 1, 2});
977+
dynamic_checkpoints.emplace_back(column->getCheckpoint(), std::unordered_map<String, size_t>{{"SharedVariant", 0}, {"Int8", 1}, {"String", 2}});
975978

976979
column->insert(Array({1, 2}));
977-
checkpoints.emplace_back(column->getCheckpoint(), std::vector<size_t>{1, 1, 2});
980+
variant_checkpoints_sizes.emplace_back(std::vector<size_t>{1, 1, 2});
981+
dynamic_checkpoints.emplace_back(column->getCheckpoint(), std::unordered_map<String, size_t>{{"SharedVariant", 1}, {"Int8", 1}, {"String", 2}});
978982

979983
column->insert(Field(42.42));
980-
checkpoints.emplace_back(column->getCheckpoint(), std::vector<size_t>{2, 1, 2});
984+
variant_checkpoints_sizes.emplace_back(std::vector<size_t>{2, 1, 2});
985+
dynamic_checkpoints.emplace_back(column->getCheckpoint(), std::unordered_map<String, size_t>{{"SharedVariant", 2}, {"Int8", 1}, {"String", 2}});
981986

982-
for (const auto & [cp, sizes] : checkpoints)
987+
for (size_t i = 0; i != variant_checkpoints_sizes.size(); ++i)
983988
{
984989
auto column_copy = column->clone();
985-
column_copy->rollback(*cp);
990+
column_copy->rollback(*dynamic_checkpoints[i].first);
986991

987-
check_checkpoint(*cp, sizes);
988-
check_variant(assert_cast<const ColumnDynamic &>(*column_copy).getVariantColumn(), sizes);
992+
check_checkpoint(*dynamic_checkpoints[i].first, dynamic_checkpoints[i].second);
993+
check_variant(assert_cast<const ColumnDynamic &>(*column_copy).getVariantColumn(), variant_checkpoints_sizes[i]);
989994
}
990995
}
991996

src/DataTypes/Serializations/SerializationObject.cpp

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -831,8 +831,12 @@ void SerializationObject::serializeBinary(const IColumn & col, size_t row_num, W
831831
size_t offset = shared_data_offsets[ssize_t(row_num) - 1];
832832
size_t end = shared_data_offsets[ssize_t(row_num)];
833833

834+
/// Calculate number of non-null dynamic paths.
835+
size_t non_null_dynamic_pats = 0;
836+
for (const auto & [_, column] : dynamic_paths)
837+
non_null_dynamic_pats += !column->isNullAt(row_num);
834838
/// Serialize number of paths and then pairs (path, value).
835-
writeVarUInt(typed_paths.size() + dynamic_paths.size() + (end - offset), ostr);
839+
writeVarUInt(typed_paths.size() + non_null_dynamic_pats + (end - offset), ostr);
836840

837841
for (const auto & [path, column] : typed_paths)
838842
{
@@ -842,8 +846,11 @@ void SerializationObject::serializeBinary(const IColumn & col, size_t row_num, W
842846

843847
for (const auto & [path, column] : dynamic_paths)
844848
{
845-
writeStringBinary(path, ostr);
846-
dynamic_serialization->serializeBinary(*column, row_num, ostr, settings);
849+
if (!column->isNullAt(row_num))
850+
{
851+
writeStringBinary(path, ostr);
852+
dynamic_serialization->serializeBinary(*column, row_num, ostr, settings);
853+
}
847854
}
848855

849856
const auto [shared_data_paths, shared_data_values] = column_object.getSharedDataPathsAndValues();

src/DataTypes/Serializations/SerializationVariant.cpp

Lines changed: 14 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -477,7 +477,11 @@ void SerializationVariant::deserializeBinaryBulkWithMultipleStreams(
477477
if (auto cached_discriminators = getFromSubstreamsCache(cache, settings.path))
478478
{
479479
variant_state = checkAndGetState<DeserializeBinaryBulkStateVariant>(state);
480-
col.getLocalDiscriminatorsPtr() = cached_discriminators;
480+
/// If rows_offset is set, in cache we store discriminators from the current range without applied offset.
481+
if (rows_offset)
482+
col.getLocalDiscriminatorsPtr()->assumeMutable()->insertRangeFrom(*cached_discriminators, 0, cached_discriminators->size());
483+
else
484+
col.getLocalDiscriminatorsPtr() = cached_discriminators;
481485
}
482486
else if (auto * discriminators_stream = settings.getter(settings.path))
483487
{
@@ -491,7 +495,7 @@ void SerializationVariant::deserializeBinaryBulkWithMultipleStreams(
491495
if (discriminators_state->mode.value == DiscriminatorsSerializationMode::BASIC)
492496
{
493497
SerializationNumber<ColumnVariant::Discriminator>().deserializeBinaryBulk(
494-
*col.getLocalDiscriminatorsPtr()->assumeMutable(), *discriminators_stream, rows_offset, limit, 0);
498+
*col.getLocalDiscriminatorsPtr()->assumeMutable(), *discriminators_stream, 0, rows_offset + limit, 0);
495499
}
496500
else
497501
{
@@ -503,10 +507,17 @@ void SerializationVariant::deserializeBinaryBulkWithMultipleStreams(
503507
variant_limits = variant_pair.second;
504508
}
505509

510+
/// If we have rows_offset, we must put discriminators without applied rows_offset in cache because we
511+
/// need these discriminators to calculate offsets for variants after we get them from cache.
506512
if (rows_offset)
507-
addToSubstreamsCache(cache, settings.path, IColumn::mutate(col.getLocalDiscriminatorsPtr()));
513+
{
514+
size_t num_read_discriminators = col.getLocalDiscriminatorsPtr()->size() - variant_state->num_rows_read;
515+
addToSubstreamsCache(cache, settings.path, col.getLocalDiscriminatorsPtr()->cut(variant_state->num_rows_read, num_read_discriminators));
516+
}
508517
else
518+
{
509519
addToSubstreamsCache(cache, settings.path, col.getLocalDiscriminatorsPtr());
520+
}
510521
}
511522
/// It may happen that there is no such stream, in this case just do nothing.
512523
else

src/DataTypes/Serializations/SerializationVariantElement.cpp

Lines changed: 25 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -120,7 +120,21 @@ void SerializationVariantElement::deserializeBinaryBulkWithMultipleStreams(
120120
if (auto cached_discriminators = getFromSubstreamsCache(cache, settings.path))
121121
{
122122
variant_element_state = checkAndGetState<DeserializeBinaryBulkStateVariantElement>(state);
123-
variant_element_state->discriminators = cached_discriminators;
123+
/// If rows_offset is set, in cache we store discriminators from the current range without applied offset.
124+
if (rows_offset)
125+
{
126+
if (!variant_element_state->discriminators || result_column->empty())
127+
{
128+
variant_element_state->discriminators = ColumnVariant::ColumnDiscriminators::create();
129+
variant_element_state->discriminators_size = 0;
130+
}
131+
132+
variant_element_state->discriminators->assumeMutable()->insertRangeFrom(*cached_discriminators, 0, cached_discriminators->size());
133+
}
134+
else
135+
{
136+
variant_element_state->discriminators = cached_discriminators;
137+
}
124138
}
125139
else if (auto * discriminators_stream = settings.getter(settings.path))
126140
{
@@ -129,10 +143,9 @@ void SerializationVariantElement::deserializeBinaryBulkWithMultipleStreams(
129143

130144
/// If we started to read a new column, reinitialize discriminators column in deserialization state.
131145
if (!variant_element_state->discriminators || result_column->empty())
132-
{
133146
variant_element_state->discriminators = ColumnVariant::ColumnDiscriminators::create();
134-
variant_element_state->discriminators_size = 0;
135-
}
147+
148+
variant_element_state->discriminators_size = variant_element_state->discriminators->size();
136149

137150
/// Deserialize discriminators according to serialization mode.
138151
if (discriminators_state->mode.value == SerializationVariant::DiscriminatorsSerializationMode::BASIC)
@@ -156,10 +169,17 @@ void SerializationVariantElement::deserializeBinaryBulkWithMultipleStreams(
156169
variant_limit = variant_pair.second;
157170
}
158171

172+
/// If we have rows_offset, we must put discriminators without applied rows_offset in cache because we
173+
/// need these discriminators to calculate offsets for variants after we get them from cache.
159174
if (rows_offset)
160-
addToSubstreamsCache(cache, settings.path, IColumn::mutate(variant_element_state->discriminators));
175+
{
176+
size_t num_read_discriminators = variant_element_state->discriminators->size() - variant_element_state->discriminators_size;
177+
addToSubstreamsCache(cache, settings.path, variant_element_state->discriminators->cut(variant_element_state->discriminators_size, num_read_discriminators));
178+
}
161179
else
180+
{
162181
addToSubstreamsCache(cache, settings.path, variant_element_state->discriminators);
182+
}
163183
}
164184
else
165185
{

src/DataTypes/Serializations/SerializationVariantElementNullMap.cpp

Lines changed: 30 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -104,7 +104,21 @@ void SerializationVariantElementNullMap::deserializeBinaryBulkWithMultipleStream
104104
if (auto cached_discriminators = getFromSubstreamsCache(cache, settings.path))
105105
{
106106
variant_element_null_map_state = checkAndGetState<DeserializeBinaryBulkStateVariantElementNullMap>(state);
107-
variant_element_null_map_state->discriminators = cached_discriminators;
107+
/// If rows_offset is set, in cache we store discriminators from the current range without applied offset.
108+
if (rows_offset)
109+
{
110+
if (!variant_element_null_map_state->discriminators || result_column->empty())
111+
{
112+
variant_element_null_map_state->discriminators = ColumnVariant::ColumnDiscriminators::create();
113+
variant_element_null_map_state->num_rows_read = 0;
114+
}
115+
116+
variant_element_null_map_state->discriminators->assumeMutable()->insertRangeFrom(*cached_discriminators, 0, cached_discriminators->size());
117+
}
118+
else
119+
{
120+
variant_element_null_map_state->discriminators = cached_discriminators;
121+
}
108122
}
109123
else if (auto * discriminators_stream = settings.getter(settings.path))
110124
{
@@ -116,6 +130,10 @@ void SerializationVariantElementNullMap::deserializeBinaryBulkWithMultipleStream
116130
if (!variant_element_null_map_state->discriminators || result_column->empty())
117131
variant_element_null_map_state->discriminators = ColumnVariant::ColumnDiscriminators::create();
118132

133+
/// Now we are sure that discriminators are not in cache and we can save the size of discriminators now to know how
134+
/// many discriminators were actually deserialized to iterate over them later to calculate limits for variants.
135+
variant_element_null_map_state->num_rows_read = variant_element_null_map_state->discriminators->size();
136+
119137
/// Deserialize discriminators according to serialization mode.
120138
if (discriminators_state->mode.value == SerializationVariant::DiscriminatorsSerializationMode::BASIC)
121139
{
@@ -137,7 +155,17 @@ void SerializationVariantElementNullMap::deserializeBinaryBulkWithMultipleStream
137155
variant_limit = variant_pair.second;
138156
}
139157

140-
addToSubstreamsCache(cache, settings.path, variant_element_null_map_state->discriminators);
158+
/// If we have rows_offset, we must put discriminators without applied rows_offset in cache because we
159+
/// need these discriminators to calculate offsets for variants after we get them from cache.
160+
if (rows_offset)
161+
{
162+
size_t num_read_discriminators = variant_element_null_map_state->discriminators->size() - variant_element_null_map_state->num_rows_read;
163+
addToSubstreamsCache(cache, settings.path, variant_element_null_map_state->discriminators->cut(variant_element_null_map_state->num_rows_read, num_read_discriminators));
164+
}
165+
else
166+
{
167+
addToSubstreamsCache(cache, settings.path, variant_element_null_map_state->discriminators);
168+
}
141169
}
142170
else
143171
{

0 commit comments

Comments
 (0)