@@ -34,7 +34,8 @@ InterpreterSelectWithUnionQuery::InterpreterSelectWithUnionQuery(
3434 const Names & required_result_column_names)
3535 : options(options_),
3636 query_ptr (query_ptr_),
37- context(std::make_shared<Context>(context_))
37+ context(std::make_shared<Context>(context_)),
38+ max_streams(context->getSettingsRef ().max_threads)
3839{
3940 const auto & ast = query_ptr->as <ASTSelectWithUnionQuery &>();
4041
@@ -196,14 +197,23 @@ BlockInputStreams InterpreterSelectWithUnionQuery::executeWithMultipleStreams(Qu
196197 parent_pipeline.addInterpreterContext (context);
197198 }
198199
200+ // / Update max_streams due to:
201+ // / - max_distributed_connections for Distributed() engine
202+ // / - max_streams_to_max_threads_ratio
203+ // /
204+ // / XXX: res.pipeline.getMaxThreads() cannot be used since it is capped to
205+ // / number of streams, which is empty for non-Processors case.
206+ max_streams = (*std::min_element (nested_interpreters.begin (), nested_interpreters.end (), [](const auto &a, const auto &b)
207+ {
208+ return a->getMaxStreams () < b->getMaxStreams ();
209+ }))->getMaxStreams ();
210+
199211 return nested_streams;
200212}
201213
202214
203215BlockIO InterpreterSelectWithUnionQuery::execute ()
204216{
205- const Settings & settings = context->getSettingsRef ();
206-
207217 BlockIO res;
208218 BlockInputStreams nested_streams = executeWithMultipleStreams (res.pipeline );
209219 BlockInputStreamPtr result_stream;
@@ -219,7 +229,7 @@ BlockIO InterpreterSelectWithUnionQuery::execute()
219229 }
220230 else
221231 {
222- result_stream = std::make_shared<UnionBlockInputStream>(nested_streams, nullptr , settings. max_threads );
232+ result_stream = std::make_shared<UnionBlockInputStream>(nested_streams, nullptr , max_streams );
223233 nested_streams.clear ();
224234 }
225235
0 commit comments