Skip to content

Commit cbb539c

Browse files
Merge pull request ClickHouse#12103 from ClickHouse/fix-12030
Fix over-limiting the number of threads for union.
2 parents 072a8e0 + 0a935dd commit cbb539c

5 files changed

Lines changed: 29 additions & 12 deletions

File tree

src/Processors/QueryPipeline.cpp

Lines changed: 18 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -564,8 +564,13 @@ void QueryPipeline::setOutputFormat(ProcessorPtr output)
564564
}
565565

566566
void QueryPipeline::unitePipelines(
567-
std::vector<std::unique_ptr<QueryPipeline>> pipelines, const Block & common_header)
567+
std::vector<std::unique_ptr<QueryPipeline>> pipelines, const Block & common_header, size_t max_threads_limit)
568568
{
569+
/// Should we limit the number of threads for united pipeline. True if all pipelines have max_threads != 0.
570+
/// If true, result max_threads will be sum(max_threads).
571+
/// Note: it may be > than settings.max_threads, so we should apply this limit again.
572+
bool will_limit_max_threads = !initialized() || max_threads != 0;
573+
569574
if (initialized())
570575
{
571576
addSimpleTransform([&](const Block & header)
@@ -630,9 +635,20 @@ void QueryPipeline::unitePipelines(
630635
interpreter_context.insert(interpreter_context.end(), pipeline.interpreter_context.begin(), pipeline.interpreter_context.end());
631636
storage_holders.insert(storage_holders.end(), pipeline.storage_holders.begin(), pipeline.storage_holders.end());
632637

633-
max_threads = std::max(max_threads, pipeline.max_threads);
638+
max_threads += pipeline.max_threads;
639+
will_limit_max_threads = will_limit_max_threads && pipeline.max_threads != 0;
640+
641+
/// If one of pipelines uses more threads then current limit, will keep it.
642+
/// It may happen if max_distributed_connections > max_threads
643+
if (pipeline.max_threads > max_threads_limit)
644+
max_threads_limit = pipeline.max_threads;
634645
}
635646

647+
if (!will_limit_max_threads)
648+
max_threads = 0;
649+
else
650+
limitMaxThreads(max_threads_limit);
651+
636652
if (!extremes.empty())
637653
{
638654
if (extremes.size() == 1)

src/Processors/QueryPipeline.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -161,7 +161,7 @@ class QueryPipeline
161161

162162
/// Unite several pipelines together. Result pipeline would have common_header structure.
163163
/// If collector is used, it will collect only newly-added processors, but not processors from pipelines.
164-
void unitePipelines(std::vector<std::unique_ptr<QueryPipeline>> pipelines, const Block & common_header);
164+
void unitePipelines(std::vector<std::unique_ptr<QueryPipeline>> pipelines, const Block & common_header, size_t max_threads_limit = 0);
165165

166166
PipelineExecutorPtr execute();
167167

src/Processors/QueryPlan/UnionStep.cpp

Lines changed: 1 addition & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -30,15 +30,7 @@ QueryPipelinePtr UnionStep::updatePipeline(QueryPipelines pipelines)
3030
return pipeline;
3131
}
3232

33-
size_t num_pipelines = pipelines.size();
34-
pipeline->unitePipelines(std::move(pipelines), output_stream->header);
35-
36-
if (num_pipelines > 1)
37-
{
38-
// nested queries can force 1 thread (due to simplicity)
39-
// but in case of union this cannot be done.
40-
pipeline->setMaxThreads(std::min<UInt64>(num_pipelines, max_threads));
41-
}
33+
pipeline->unitePipelines(std::move(pipelines), output_stream->header ,max_threads);
4234

4335
processors = collector.detachProcessors();
4436
return pipeline;
Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
300
2+
1
Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
set log_queries = 1;
2+
set max_threads = 16;
3+
4+
SELECT count() FROM (SELECT number FROM numbers_mt(1000000) ORDER BY number DESC LIMIT 100 UNION ALL SELECT number FROM numbers_mt(1000000) ORDER BY number DESC LIMIT 100 UNION ALL SELECT number FROM numbers_mt(1000000) ORDER BY number DESC LIMIT 100);
5+
6+
system flush logs;
7+
select length(thread_ids) >= 16 from system.query_log where event_date >= today() - 1 and query like '%SELECT count() FROM (SELECT number FROM numbers_mt(1000000) ORDER BY number DESC LIMIT 100 UNION ALL SELECT number FROM numbers_mt(1000000) ORDER BY number DESC LIMIT 100 UNION ALL SELECT number FROM numbers_mt(1000000) ORDER BY number DESC LIMIT 100)%' and type = 'QueryFinish' order by query_start_time desc limit 1;

0 commit comments

Comments
 (0)