Skip to content

Commit 31a6c7c

Browse files
committed
Style changes around filterKeysForPartitionPruning
1 parent 95e9a27 commit 31a6c7c

2 files changed

Lines changed: 18 additions & 16 deletions

File tree

src/Storages/StorageS3.cpp

Lines changed: 16 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -172,7 +172,7 @@ class ReadFromStorageS3Step : public SourceStepWithFilter
172172
};
173173

174174

175-
static Block getBlockWithVirtuals(const NamesAndTypesList & virtual_columns, const String & bucket, const Strings & keys)
175+
static Block getBlockWithVirtuals(const NamesAndTypesList & virtual_columns, const String & bucket, const std::unordered_set<String> & keys)
176176
{
177177
Block virtual_columns_block;
178178
fs::path bucket_path(bucket);
@@ -226,31 +226,31 @@ static Block getBlockWithVirtuals(const NamesAndTypesList & virtual_columns, con
226226
return virtual_columns_block;
227227
}
228228

229-
static void filterKeysForPartitionPruning(std::vector<String> & keys,
230-
const String & bucket,
231-
const NamesAndTypesList & virtual_columns,
232-
const std::vector<ActionsDAGPtr> & filter_dags,
233-
ContextPtr context)
229+
static std::vector<String> filterKeysForPartitionPruning(
230+
const std::vector<String> & keys,
231+
const String & bucket,
232+
const NamesAndTypesList & virtual_columns,
233+
const std::vector<ActionsDAGPtr> & filter_dags,
234+
ContextPtr context)
234235
{
236+
std::unordered_set<String> result_keys(keys.begin(), keys.end());
235237
for (const auto & filter_dag : filter_dags)
236238
{
237-
if (keys.empty())
239+
if (result_keys.empty())
238240
break;
239241

240-
auto block = getBlockWithVirtuals(virtual_columns, bucket, keys);
242+
auto block = getBlockWithVirtuals(virtual_columns, bucket, result_keys);
241243

242244
auto filter_actions = VirtualColumnUtils::splitFilterDagForAllowedInputs(block, filter_dag, context);
243245
if (!filter_actions)
244246
continue;
245247
VirtualColumnUtils::filterBlockWithQuery(filter_actions, block, context);
246248

247-
std::unordered_set<String> filtered_keys = VirtualColumnUtils::extractSingleValueFromBlock<String>(block, "_key");
248-
LOG_DEBUG(&Poco::Logger::get("StorageS3"), "Applied partition pruning {} from {} keys left", filtered_keys.size(), keys.size());
249-
keys.clear();
250-
keys.reserve(filtered_keys.size());
251-
for (auto && key : filtered_keys)
252-
keys.emplace_back(key);
249+
result_keys = VirtualColumnUtils::extractSingleValueFromBlock<String>(block, "_key");
253250
}
251+
252+
LOG_DEBUG(&Poco::Logger::get("StorageS3"), "Applied partition pruning {} from {} keys left", result_keys.size(), keys.size());
253+
return std::vector<String>(result_keys.begin(), result_keys.end());
254254
}
255255

256256
class IOutputFormat;
@@ -1161,8 +1161,7 @@ static std::shared_ptr<StorageS3Source::IIterator> createFileIterator(
11611161
}
11621162
else
11631163
{
1164-
Strings keys = configuration.keys;
1165-
filterKeysForPartitionPruning(keys, configuration.url.bucket, virtual_columns, filter_dags, local_context);
1164+
Strings keys = filterKeysForPartitionPruning(configuration.keys, configuration.url.bucket, virtual_columns, filter_dags, local_context);
11661165
return std::make_shared<StorageS3Source::KeysIterator>(
11671166
*configuration.client, configuration.url.version_id, keys,
11681167
configuration.url.bucket, configuration.request_settings, read_keys, file_progress_callback);
@@ -1266,6 +1265,7 @@ void ReadFromStorageS3Step::initializePipeline(QueryPipelineBuilder & pipeline,
12661265

12671266
void ReadFromStorageS3Step::applyFilters()
12681267
{
1268+
/// We will use filter_dags in filterKeysForPartitionPruning called from initializePipeline, nothing to do here
12691269
}
12701270

12711271
SinkToStoragePtr StorageS3::write(const ASTPtr & query, const StorageMetadataPtr & metadata_snapshot, ContextPtr local_context, bool /*async_insert*/)

src/Storages/VirtualColumnUtils.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,8 @@ bool prepareFilterBlockWithQuery(const ASTPtr & query, ContextPtr context, Block
3434
/// If `expression_ast` is passed, use it to filter block.
3535
void filterBlockWithQuery(const ASTPtr & query, Block & block, ContextPtr context, ASTPtr expression_ast = {});
3636
void filterBlockWithQuery(ActionsDAGPtr dag, Block & block, ContextPtr context);
37+
38+
/// Extract subset of filter_dag that can be evaluated using only columns from header
3739
ActionsDAGPtr splitFilterDagForAllowedInputs(const Block & header, const ActionsDAGPtr & filter_dag, ContextPtr context);
3840

3941
/// Extract from the input stream a set of `name` column values

0 commit comments

Comments
 (0)