@@ -34,6 +34,7 @@ namespace Setting
3434 extern const SettingsBool async_query_sending_for_remote;
3535 extern const SettingsBool async_socket_for_remote;
3636 extern const SettingsBool skip_unavailable_shards;
37+ extern const SettingsNonZeroUInt64 max_parallel_replicas;
3738}
3839
3940namespace ErrorCodes
@@ -59,15 +60,19 @@ void ReadFromCluster::applyFilters(ActionDAGNodes added_filter_nodes)
5960 if (filter_actions_dag)
6061 predicate = filter_actions_dag->getOutputs ().at (0 );
6162
62- createExtension (predicate);
63+ auto max_replicas_to_use = static_cast <UInt64>(cluster->getShardsInfo ().size ());
64+ if (context->getSettingsRef ()[Setting::max_parallel_replicas] > 1 )
65+ max_replicas_to_use = std::min (max_replicas_to_use, context->getSettingsRef ()[Setting::max_parallel_replicas].value );
66+
67+ createExtension (predicate, max_replicas_to_use);
6368}
6469
65- void ReadFromCluster::createExtension (const ActionsDAG::Node * predicate)
70+ void ReadFromCluster::createExtension (const ActionsDAG::Node * predicate, size_t number_of_replicas )
6671{
6772 if (extension)
6873 return ;
6974
70- extension = storage->getTaskIteratorExtension (predicate, context);
75+ extension = storage->getTaskIteratorExtension (predicate, context, number_of_replicas );
7176}
7277
7378// / The code executes on initiator
@@ -155,38 +160,54 @@ SinkToStoragePtr IStorageCluster::write(
155160
156161void ReadFromCluster::initializePipeline (QueryPipelineBuilder & pipeline, const BuildQueryPipelineSettings &)
157162{
158- createExtension (nullptr );
159-
160163 const Scalars & scalars = context->hasQueryContext () ? context->getQueryContext ()->getScalars () : Scalars{};
161164 const bool add_agg_info = processed_stage == QueryProcessingStage::WithMergeableState;
162165
163166 Pipes pipes;
164167 auto new_context = updateSettings (context->getSettingsRef ());
165168 const auto & current_settings = new_context->getSettingsRef ();
166169 auto timeouts = ConnectionTimeouts::getTCPTimeoutsWithFailover (current_settings);
170+
171+ size_t replica_index = 0 ;
172+ auto max_replicas_to_use = static_cast <UInt64>(cluster->getShardsInfo ().size ());
173+ if (current_settings[Setting::max_parallel_replicas] > 1 )
174+ max_replicas_to_use = std::min (max_replicas_to_use, current_settings[Setting::max_parallel_replicas].value );
175+
176+ createExtension (nullptr , max_replicas_to_use);
177+
167178 for (const auto & shard_info : cluster->getShardsInfo ())
168179 {
169- auto try_results = shard_info.pool ->getMany (timeouts, current_settings, PoolMode::GET_MANY);
170- for (auto & try_result : try_results)
171- {
172- auto remote_query_executor = std::make_shared<RemoteQueryExecutor>(
173- std::vector<IConnectionPool::Entry>{try_result},
174- queryToString (query_to_send),
175- getOutputHeader (),
176- new_context,
177- /* throttler=*/ nullptr ,
178- scalars,
179- Tables (),
180- processed_stage,
181- extension);
182-
183- remote_query_executor->setLogger (log);
184- pipes.emplace_back (std::make_shared<RemoteSource>(
185- remote_query_executor,
186- add_agg_info,
187- current_settings[Setting::async_socket_for_remote],
188- current_settings[Setting::async_query_sending_for_remote]));
189- }
180+ // / We're taking all replicas as shards,
181+ // / so each shard will have only one address to connect to.
182+ auto try_results = shard_info.pool ->getMany (
183+ timeouts,
184+ current_settings,
185+ PoolMode::GET_ONE,
186+ {},
187+ /* skip_unavailable_endpoints=*/ true );
188+
189+ if (try_results.empty ())
190+ continue ;
191+
192+ IConnections::ReplicaInfo replica_info{ .number_of_current_replica = replica_index++ };
193+
194+ auto remote_query_executor = std::make_shared<RemoteQueryExecutor>(
195+ std::vector<IConnectionPool::Entry>{try_results.front ()},
196+ queryToString (query_to_send),
197+ getOutputHeader (),
198+ new_context,
199+ /* throttler=*/ nullptr ,
200+ scalars,
201+ Tables (),
202+ processed_stage,
203+ RemoteQueryExecutor::Extension{.task_iterator = extension->task_iterator , .replica_info = std::move (replica_info)});
204+
205+ remote_query_executor->setLogger (log);
206+ pipes.emplace_back (std::make_shared<RemoteSource>(
207+ remote_query_executor,
208+ add_agg_info,
209+ current_settings[Setting::async_socket_for_remote],
210+ current_settings[Setting::async_query_sending_for_remote]));
190211 }
191212
192213 auto pipe = Pipe::unitePipes (std::move (pipes));
0 commit comments