@@ -172,7 +172,7 @@ class ReadFromStorageS3Step : public SourceStepWithFilter
172172};
173173
174174
175- static Block getBlockWithVirtuals (const NamesAndTypesList & virtual_columns, const String & bucket, const Strings & keys)
175+ static Block getBlockWithVirtuals (const NamesAndTypesList & virtual_columns, const String & bucket, const std::unordered_set<String> & keys)
176176{
177177 Block virtual_columns_block;
178178 fs::path bucket_path (bucket);
@@ -226,31 +226,31 @@ static Block getBlockWithVirtuals(const NamesAndTypesList & virtual_columns, con
226226 return virtual_columns_block;
227227}
228228
229- static void filterKeysForPartitionPruning (std::vector<String> & keys,
230- const String & bucket,
231- const NamesAndTypesList & virtual_columns,
232- const std::vector<ActionsDAGPtr> & filter_dags,
233- ContextPtr context)
229+ static std::vector<String> filterKeysForPartitionPruning (
230+ const std::vector<String> & keys,
231+ const String & bucket,
232+ const NamesAndTypesList & virtual_columns,
233+ const std::vector<ActionsDAGPtr> & filter_dags,
234+ ContextPtr context)
234235{
236+ std::unordered_set<String> result_keys (keys.begin (), keys.end ());
235237 for (const auto & filter_dag : filter_dags)
236238 {
237- if (keys .empty ())
239+ if (result_keys .empty ())
238240 break ;
239241
240- auto block = getBlockWithVirtuals (virtual_columns, bucket, keys );
242+ auto block = getBlockWithVirtuals (virtual_columns, bucket, result_keys );
241243
242244 auto filter_actions = VirtualColumnUtils::splitFilterDagForAllowedInputs (block, filter_dag, context);
243245 if (!filter_actions)
244246 continue ;
245247 VirtualColumnUtils::filterBlockWithQuery (filter_actions, block, context);
246248
247- std::unordered_set<String> filtered_keys = VirtualColumnUtils::extractSingleValueFromBlock<String>(block, " _key" );
248- LOG_DEBUG (&Poco::Logger::get (" StorageS3" ), " Applied partition pruning {} from {} keys left" , filtered_keys.size (), keys.size ());
249- keys.clear ();
250- keys.reserve (filtered_keys.size ());
251- for (auto && key : filtered_keys)
252- keys.emplace_back (key);
249+ result_keys = VirtualColumnUtils::extractSingleValueFromBlock<String>(block, " _key" );
253250 }
251+
252+ LOG_DEBUG (&Poco::Logger::get (" StorageS3" ), " Applied partition pruning {} from {} keys left" , result_keys.size (), keys.size ());
253+ return std::vector<String>(result_keys.begin (), result_keys.end ());
254254}
255255
256256class IOutputFormat ;
@@ -1161,8 +1161,7 @@ static std::shared_ptr<StorageS3Source::IIterator> createFileIterator(
11611161 }
11621162 else
11631163 {
1164- Strings keys = configuration.keys ;
1165- filterKeysForPartitionPruning (keys, configuration.url .bucket , virtual_columns, filter_dags, local_context);
1164+ Strings keys = filterKeysForPartitionPruning (configuration.keys , configuration.url .bucket , virtual_columns, filter_dags, local_context);
11661165 return std::make_shared<StorageS3Source::KeysIterator>(
11671166 *configuration.client , configuration.url .version_id , keys,
11681167 configuration.url .bucket , configuration.request_settings , read_keys, file_progress_callback);
@@ -1266,6 +1265,7 @@ void ReadFromStorageS3Step::initializePipeline(QueryPipelineBuilder & pipeline,
12661265
12671266void ReadFromStorageS3Step::applyFilters ()
12681267{
1268+ // / We will use filter_dags in filterKeysForPartitionPruning called from initializePipeline, nothing to do here
12691269}
12701270
12711271SinkToStoragePtr StorageS3::write (const ASTPtr & query, const StorageMetadataPtr & metadata_snapshot, ContextPtr local_context, bool /* async_insert*/ )
0 commit comments