Skip to content

Commit 9a2760d

Browse files
Merge pull request #9673 from azat/processors-max_distributed_connections
Fix max_distributed_connections
2 parents 9a41113 + 550cfef commit 9a2760d

12 files changed

Lines changed: 58 additions & 14 deletions

dbms/programs/server/TCPHandler.cpp

Lines changed: 3 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -265,7 +265,7 @@ void TCPHandler::runImpl()
265265
state.io.onFinish();
266266
}
267267
else if (state.io.pipeline.initialized())
268-
processOrdinaryQueryWithProcessors(query_context->getSettingsRef().max_threads);
268+
processOrdinaryQueryWithProcessors();
269269
else
270270
processOrdinaryQuery();
271271

@@ -544,13 +544,10 @@ void TCPHandler::processOrdinaryQuery()
544544
}
545545

546546

547-
void TCPHandler::processOrdinaryQueryWithProcessors(size_t num_threads)
547+
void TCPHandler::processOrdinaryQueryWithProcessors()
548548
{
549549
auto & pipeline = state.io.pipeline;
550550

551-
/// Reduce the number of threads to recommended value.
552-
num_threads = std::min(num_threads, pipeline.getNumThreads());
553-
554551
/// Send header-block, to allow client to prepare output format for data to send.
555552
{
556553
auto & header = pipeline.getHeader();
@@ -585,7 +582,7 @@ void TCPHandler::processOrdinaryQueryWithProcessors(size_t num_threads)
585582

586583
try
587584
{
588-
executor->execute(num_threads);
585+
executor->execute(pipeline.getNumThreads());
589586
}
590587
catch (...)
591588
{

dbms/programs/server/TCPHandler.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -171,7 +171,7 @@ class TCPHandler : public Poco::Net::TCPServerConnection
171171
/// Process a request that does not require the receiving of data blocks from the client
172172
void processOrdinaryQuery();
173173

174-
void processOrdinaryQueryWithProcessors(size_t num_threads);
174+
void processOrdinaryQueryWithProcessors();
175175

176176
void processTablesStatusRequest();
177177

dbms/src/Interpreters/InterpreterSelectQuery.cpp

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -467,7 +467,6 @@ QueryPipeline InterpreterSelectQuery::executeWithProcessors()
467467
{
468468
QueryPipeline query_pipeline;
469469
executeImpl(query_pipeline, input, std::move(input_pipe), query_pipeline);
470-
query_pipeline.setMaxThreads(max_streams);
471470
query_pipeline.addInterpreterContext(context);
472471
query_pipeline.addStorageHolder(storage);
473472
return query_pipeline;
@@ -1301,6 +1300,7 @@ void InterpreterSelectQuery::executeFetchColumns(
13011300
{
13021301
is_remote = true;
13031302
max_streams = settings.max_distributed_connections;
1303+
pipeline.setMaxThreads(max_streams);
13041304
}
13051305

13061306
UInt64 max_block_size = settings.max_block_size;
@@ -1325,6 +1325,7 @@ void InterpreterSelectQuery::executeFetchColumns(
13251325
{
13261326
max_block_size = std::max(UInt64(1), limit_length + limit_offset);
13271327
max_streams = 1;
1328+
pipeline.setMaxThreads(max_streams);
13281329
}
13291330

13301331
if (!max_block_size)

dbms/src/Interpreters/InterpreterSelectQuery.h

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -92,6 +92,8 @@ class InterpreterSelectQuery : public IInterpreter
9292

9393
ASTPtr getQuery() const { return query_ptr; }
9494

95+
size_t getMaxStreams() const { return max_streams; }
96+
9597
private:
9698
InterpreterSelectQuery(
9799
const ASTPtr & query_ptr_,
@@ -122,6 +124,9 @@ class InterpreterSelectQuery : public IInterpreter
122124
BlockInputStreamPtr stream_with_non_joined_data;
123125
bool union_stream = false;
124126

127+
/// Cache value of InterpreterSelectQuery::max_streams
128+
size_t max_threads = 1;
129+
125130
BlockInputStreamPtr & firstStream() { return streams.at(0); }
126131

127132
template <typename Transform>
@@ -147,6 +152,10 @@ class InterpreterSelectQuery : public IInterpreter
147152

148153
bool hasDelayedStream() const { return stream_with_non_joined_data != nullptr; }
149154
bool initialized() const { return !streams.empty(); }
155+
156+
/// Compatibility with QueryPipeline (Processors)
157+
void setMaxThreads(size_t max_threads_) { max_threads = max_threads_; }
158+
size_t getNumThreads() const { return max_threads; }
150159
};
151160

152161
template <typename TPipeline>

dbms/src/Interpreters/InterpreterSelectWithUnionQuery.cpp

Lines changed: 14 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,8 @@ InterpreterSelectWithUnionQuery::InterpreterSelectWithUnionQuery(
3434
const Names & required_result_column_names)
3535
: options(options_),
3636
query_ptr(query_ptr_),
37-
context(std::make_shared<Context>(context_))
37+
context(std::make_shared<Context>(context_)),
38+
max_streams(context->getSettingsRef().max_threads)
3839
{
3940
const auto & ast = query_ptr->as<ASTSelectWithUnionQuery &>();
4041

@@ -196,14 +197,23 @@ BlockInputStreams InterpreterSelectWithUnionQuery::executeWithMultipleStreams(Qu
196197
parent_pipeline.addInterpreterContext(context);
197198
}
198199

200+
/// Update max_streams due to:
201+
/// - max_distributed_connections for Distributed() engine
202+
/// - max_streams_to_max_threads_ratio
203+
///
204+
/// XXX: res.pipeline.getMaxThreads() cannot be used since it is capped to
205+
/// number of streams, which is empty for non-Processors case.
206+
max_streams = (*std::min_element(nested_interpreters.begin(), nested_interpreters.end(), [](const auto &a, const auto &b)
207+
{
208+
return a->getMaxStreams() < b->getMaxStreams();
209+
}))->getMaxStreams();
210+
199211
return nested_streams;
200212
}
201213

202214

203215
BlockIO InterpreterSelectWithUnionQuery::execute()
204216
{
205-
const Settings & settings = context->getSettingsRef();
206-
207217
BlockIO res;
208218
BlockInputStreams nested_streams = executeWithMultipleStreams(res.pipeline);
209219
BlockInputStreamPtr result_stream;
@@ -219,7 +229,7 @@ BlockIO InterpreterSelectWithUnionQuery::execute()
219229
}
220230
else
221231
{
222-
result_stream = std::make_shared<UnionBlockInputStream>(nested_streams, nullptr, settings.max_threads);
232+
result_stream = std::make_shared<UnionBlockInputStream>(nested_streams, nullptr, max_streams);
223233
nested_streams.clear();
224234
}
225235

dbms/src/Interpreters/InterpreterSelectWithUnionQuery.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,8 @@ class InterpreterSelectWithUnionQuery : public IInterpreter
5656

5757
Block result_header;
5858

59+
size_t max_streams = 1;
60+
5961
static Block getCommonHeaderForUnion(const Blocks & headers);
6062
};
6163

dbms/src/Interpreters/executeQuery.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -751,7 +751,7 @@ void executeQuery(
751751

752752
{
753753
auto executor = pipeline.execute();
754-
executor->execute(context.getSettingsRef().max_threads);
754+
executor->execute(pipeline.getNumThreads());
755755
}
756756
}
757757
}

dbms/src/Storages/StorageDistributed.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -427,7 +427,7 @@ Pipes StorageDistributed::read(
427427
if (!smaller_cluster)
428428
{
429429
LOG_DEBUG(log, "Reading from " << table_id.getNameForLogs() <<
430-
(has_sharding_key ? "" : "(no sharding key)") << ": "
430+
(has_sharding_key ? "" : " (no sharding key)") << ": "
431431
"Unable to figure out irrelevant shards from WHERE/PREWHERE clauses - "
432432
"the query will be sent to all shards of the cluster");
433433

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
0
2+
0
Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
#!/usr/bin/env bash
2+
3+
CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
4+
. $CURDIR/../shell_config.sh
5+
6+
opts=(
7+
--max_distributed_connections 2
8+
--max_threads 1
9+
--query "SELECT sleepEachRow(1) FROM remote('127.{2,3}', system.one)"
10+
)
11+
# 1.8 less then 2 seconds, but long enough to cover possible load peaks
12+
# "$@" left to pass manual options (like --experimental_use_processors 0) during manual testing
13+
timeout 1.8s $CLICKHOUSE_CLIENT "${opts[@]}" "$@"

0 commit comments

Comments
 (0)