Skip to content

Commit 7387328

Browse files
authored
Merge pull request #80179 from ClickHouse/out_of_order_buckets
Allow replicas to send some buckets out of order
2 parents cea91ea + f29d418 commit 7387328

35 files changed

Lines changed: 363 additions & 148 deletions

src/Core/BlockInfo.cpp

Lines changed: 13 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -18,12 +18,15 @@ namespace ErrorCodes
1818

1919

2020
/// Write values in binary form. NOTE: You could use protobuf, but it would be overkill for this case.
21-
void BlockInfo::write(WriteBuffer & out) const
21+
void BlockInfo::write(WriteBuffer & out, UInt64 server_protocol_revision) const
2222
{
2323
/// Set of pairs `FIELD_NUM`, value in binary form. Then 0.
24-
#define WRITE_FIELD(TYPE, NAME, DEFAULT, FIELD_NUM) \
25-
writeVarUInt(FIELD_NUM, out); \
26-
writeBinary(NAME, out);
24+
#define WRITE_FIELD(TYPE, NAME, DEFAULT, FIELD_NUM, MIN_PROTOCOL_REVISION) \
25+
if (server_protocol_revision >= (MIN_PROTOCOL_REVISION)) \
26+
{ \
27+
writeVarUInt(FIELD_NUM, out); \
28+
writeBinary(NAME, out); \
29+
}
2730

2831
APPLY_FOR_BLOCK_INFO_FIELDS(WRITE_FIELD)
2932

@@ -32,7 +35,7 @@ void BlockInfo::write(WriteBuffer & out) const
3235
}
3336

3437
/// Read values in binary form.
35-
void BlockInfo::read(ReadBuffer & in)
38+
void BlockInfo::read(ReadBuffer & in, UInt64 client_protocol_revision)
3639
{
3740
UInt64 field_num = 0;
3841

@@ -44,10 +47,11 @@ void BlockInfo::read(ReadBuffer & in)
4447

4548
switch (field_num)
4649
{
47-
#define READ_FIELD(TYPE, NAME, DEFAULT, FIELD_NUM) \
48-
case FIELD_NUM: \
49-
readBinary(NAME, in); \
50-
break;
50+
#define READ_FIELD(TYPE, NAME, DEFAULT, FIELD_NUM, MIN_PROTOCOL_REVISION) \
51+
case FIELD_NUM: \
52+
if (client_protocol_revision >= (MIN_PROTOCOL_REVISION)) \
53+
readBinary(NAME, in); \
54+
break;
5155

5256
APPLY_FOR_BLOCK_INFO_FIELDS(READ_FIELD)
5357

src/Core/BlockInfo.h

Lines changed: 16 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,10 @@
11
#pragma once
22

3+
#include <Core/ProtocolDefines.h>
34
#include <base/types.h>
45

6+
#include <vector>
7+
58
namespace DB
69
{
710

@@ -14,7 +17,7 @@ struct BlockInfo
1417
{
1518
/** is_overflows:
1619
* After running GROUP BY ... WITH TOTALS with the max_rows_to_group_by and group_by_overflow_mode = 'any' settings,
17-
* a row is inserted in the separate block with aggregated values that have not passed max_rows_to_group_by.
20+
* a row is inserted in the separate block with aggregated values that have not passed max_rows_to_group_by.
1821
* If it is such a block, then is_overflows is set to true for it.
1922
*/
2023

@@ -24,22 +27,28 @@ struct BlockInfo
2427
* Otherwise -1.
2528
*/
2629

27-
#define APPLY_FOR_BLOCK_INFO_FIELDS(M) \
28-
M(bool, is_overflows, false, 1) \
29-
M(Int32, bucket_num, -1, 2)
30+
/** out_of_order_buckets:
31+
* List of id-s of buckets delayed by `ConvertingAggregatedToChunksTransform` on the current node.
32+
* Please refer to the comment in `ConvertingAggregatedToChunksTransform` for more details.
33+
*/
34+
35+
#define APPLY_FOR_BLOCK_INFO_FIELDS(M) \
36+
M(bool, is_overflows, false, 1, 0) \
37+
M(Int32, bucket_num, -1, 2, 0) \
38+
M(std::vector<Int32>, out_of_order_buckets, {}, 3, DBMS_MIN_REVISION_WITH_OUT_OF_ORDER_BUCKETS_IN_AGGREGATION)
3039

31-
#define DECLARE_FIELD(TYPE, NAME, DEFAULT, FIELD_NUM) \
40+
#define DECLARE_FIELD(TYPE, NAME, DEFAULT, FIELD_NUM, MIN_PROTOCOL_REVISION) \
3241
TYPE NAME = DEFAULT;
3342

3443
APPLY_FOR_BLOCK_INFO_FIELDS(DECLARE_FIELD)
3544

3645
#undef DECLARE_FIELD
3746

3847
/// Write the values in binary form. NOTE: You could use protobuf, but it would be overkill for this case.
39-
void write(WriteBuffer & out) const;
48+
void write(WriteBuffer & out, UInt64 server_protocol_revision) const;
4049

4150
/// Read the values in binary form.
42-
void read(ReadBuffer & in);
51+
void read(ReadBuffer & in, UInt64 client_protocol_revision);
4352
};
4453

4554
}

src/Core/ProtocolDefines.h

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -112,12 +112,14 @@ static constexpr auto DBMS_MIN_REVISON_WITH_PARALLEL_BLOCK_MARSHALLING = 54478;
112112

113113
static constexpr auto DBMS_MIN_REVISION_WITH_VERSIONED_CLUSTER_FUNCTION_PROTOCOL = 54479;
114114

115+
static constexpr auto DBMS_MIN_REVISION_WITH_OUT_OF_ORDER_BUCKETS_IN_AGGREGATION = 54480;
116+
115117
/// Version of ClickHouse TCP protocol.
116118
///
117119
/// Should be incremented manually on protocol changes.
118120
///
119121
/// NOTE: DBMS_TCP_PROTOCOL_VERSION has nothing common with VERSION_REVISION,
120122
/// later is just a number for server version (one number instead of commit SHA)
121123
/// for simplicity (sometimes it may be more convenient in some use cases).
122-
static constexpr auto DBMS_TCP_PROTOCOL_VERSION = 54479;
124+
static constexpr auto DBMS_TCP_PROTOCOL_VERSION = 54480;
123125
}

src/Core/Settings.cpp

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6770,6 +6770,11 @@ When the query prioritization mechanism is employed (see setting `priority`), lo
67706770
)", BETA) \
67716771
DECLARE(Float, min_os_cpu_wait_time_ratio_to_throw, 0.0, "Min ratio between OS CPU wait (OSCPUWaitMicroseconds metric) and busy (OSCPUVirtualTimeMicroseconds metric) times to consider rejecting queries. Linear interpolation between min and max ratio is used to calculate the probability, the probability is 0 at this point.", 0) \
67726772
DECLARE(Float, max_os_cpu_wait_time_ratio_to_throw, 0.0, "Max ratio between OS CPU wait (OSCPUWaitMicroseconds metric) and busy (OSCPUVirtualTimeMicroseconds metric) times to consider rejecting queries. Linear interpolation between min and max ratio is used to calculate the probability, the probability is 1 at this point.", 0) \
6773+
DECLARE(Bool, enable_producing_buckets_out_of_order_in_aggregation, true, R"(
6774+
Allow memory-efficient aggregation (see `distributed_aggregation_memory_efficient`) to produce buckets out of order.
6775+
It may improve performance when aggregation bucket sizes are skewed by letting a replica to send buckets with higher id-s to the initiator while it is still processing some heavy buckets with lower id-s.
6776+
The downside is potentially higher memory usage.
6777+
)", 0) \
67736778
DECLARE(Bool, enable_parallel_blocks_marshalling, true, "Affects only distributed queries. If enabled, blocks will be (de)serialized and (de)compressed on pipeline threads (i.e. with higher parallelism that what we have by default) before/after sending to the initiator.", 0) \
67746779
DECLARE(UInt64, min_outstreams_per_resize_after_split, 24, R"(
67756780
Specifies the minimum number of output streams of a `Resize` or `StrictResize` processor after the split is performed during pipeline generation. If the resulting number of streams is less than this value, the split operation will not occur.

src/Core/SettingsChangesHistory.cpp

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,7 @@ const VersionToSettingsChangesMap & getSettingsChangesHistory()
7373
{"use_roaring_bitmap_iceberg_positional_deletes", false, false, "New setting"},
7474
{"iceberg_metadata_compression_method", "", "", "New setting"},
7575
{"allow_experimental_correlated_subqueries", false, true, "Mark correlated subqueries support as Beta."},
76+
{"enable_producing_buckets_out_of_order_in_aggregation", false, true, "New setting"},
7677
});
7778
addSettingsChanges(settings_changes_history, "25.7",
7879
{

src/Formats/NativeReader.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -137,7 +137,7 @@ Block NativeReader::read()
137137

138138
/// Additional information about the block.
139139
if (server_revision > 0)
140-
res.info.read(istr);
140+
res.info.read(istr, server_revision);
141141

142142
/// Dimensions
143143
size_t columns = 0;

src/Formats/NativeWriter.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -110,7 +110,7 @@ size_t NativeWriter::write(const Block & block)
110110

111111
/// Additional information about the block.
112112
if (client_revision > 0)
113-
block.info.write(ostr);
113+
block.info.write(ostr, client_revision);
114114

115115
block.checkNumberOfRows();
116116

src/Interpreters/Aggregator.cpp

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -209,7 +209,8 @@ Aggregator::Params::Params(
209209
bool only_merge_, // true for projections
210210
bool optimize_group_by_constant_keys_,
211211
float min_hit_rate_to_use_consecutive_keys_optimization_,
212-
const StatsCollectingParams & stats_collecting_params_)
212+
const StatsCollectingParams & stats_collecting_params_,
213+
bool enable_producing_buckets_out_of_order_in_aggregation_)
213214
: keys(keys_)
214215
, keys_size(keys.size())
215216
, aggregates(aggregates_)
@@ -232,6 +233,7 @@ Aggregator::Params::Params(
232233
, optimize_group_by_constant_keys(optimize_group_by_constant_keys_)
233234
, min_hit_rate_to_use_consecutive_keys_optimization(min_hit_rate_to_use_consecutive_keys_optimization_)
234235
, stats_collecting_params(stats_collecting_params_)
236+
, enable_producing_buckets_out_of_order_in_aggregation(enable_producing_buckets_out_of_order_in_aggregation_)
235237
{
236238
}
237239

src/Interpreters/Aggregator.h

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -117,6 +117,8 @@ class Aggregator final
117117
const float min_hit_rate_to_use_consecutive_keys_optimization = 0.;
118118
StatsCollectingParams stats_collecting_params;
119119

120+
bool enable_producing_buckets_out_of_order_in_aggregation = true;
121+
120122
static size_t getMaxBytesBeforeExternalGroupBy(size_t max_bytes_before_external_group_by, double max_bytes_ratio_before_external_group_by);
121123

122124
Params(
@@ -139,7 +141,8 @@ class Aggregator final
139141
bool only_merge_, // true for projections
140142
bool optimize_group_by_constant_keys_,
141143
float min_hit_rate_to_use_consecutive_keys_optimization_,
142-
const StatsCollectingParams & stats_collecting_params_);
144+
const StatsCollectingParams & stats_collecting_params_,
145+
bool enable_producing_buckets_out_of_order_in_aggregation_);
143146

144147
/// Only parameters that matter during merge.
145148
Params(

src/Interpreters/InterpreterSelectQuery.cpp

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -195,6 +195,7 @@ namespace Setting
195195
extern const SettingsUInt64 max_rows_to_transfer;
196196
extern const SettingsOverflowMode transfer_overflow_mode;
197197
extern const SettingsString implicit_table_at_top_level;
198+
extern const SettingsBool enable_producing_buckets_out_of_order_in_aggregation;
198199
}
199200

200201
namespace ServerSetting
@@ -2782,16 +2783,16 @@ static Aggregator::Params getAggregatorParams(
27822783
context.getServerSettings()[ServerSetting::max_entries_for_hash_table_stats],
27832784
settings[Setting::max_size_to_preallocate_for_aggregation]);
27842785

2785-
return Aggregator::Params
2786-
{
2786+
return Aggregator::Params{
27872787
keys,
27882788
aggregates,
27892789
overflow_row,
27902790
settings[Setting::max_rows_to_group_by],
27912791
settings[Setting::group_by_overflow_mode],
27922792
group_by_two_level_threshold,
27932793
group_by_two_level_threshold_bytes,
2794-
Aggregator::Params::getMaxBytesBeforeExternalGroupBy(settings[Setting::max_bytes_before_external_group_by], settings[Setting::max_bytes_ratio_before_external_group_by]),
2794+
Aggregator::Params::getMaxBytesBeforeExternalGroupBy(
2795+
settings[Setting::max_bytes_before_external_group_by], settings[Setting::max_bytes_ratio_before_external_group_by]),
27952796
settings[Setting::empty_result_for_aggregation_by_empty_set]
27962797
|| (settings[Setting::empty_result_for_aggregation_by_constant_keys_on_empty_set] && keys.empty()
27972798
&& query_analyzer.hasConstAggregationKeys()),
@@ -2805,8 +2806,8 @@ static Aggregator::Params getAggregatorParams(
28052806
/* only_merge */ false,
28062807
settings[Setting::optimize_group_by_constant_keys],
28072808
settings[Setting::min_hit_rate_to_use_consecutive_keys_optimization],
2808-
stats_collecting_params
2809-
};
2809+
stats_collecting_params,
2810+
settings[Setting::enable_producing_buckets_out_of_order_in_aggregation]};
28102811
}
28112812

28122813
void InterpreterSelectQuery::executeAggregation(

0 commit comments

Comments
 (0)