Skip to content

Commit 12fb72f

Browse files
adikusianton-ru
authored andcommitted
Change object storage cluster table functions to prefer specific replicas to improve cache locality
1 parent da8404c commit 12fb72f

14 files changed

Lines changed: 274 additions & 47 deletions

src/QueryPipeline/RemoteQueryExecutor.cpp

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -723,8 +723,12 @@ void RemoteQueryExecutor::processReadTaskRequest()
723723
if (!extension || !extension->task_iterator)
724724
throw Exception(ErrorCodes::LOGICAL_ERROR, "Distributed task iterator is not initialized");
725725

726+
if (!extension->replica_info)
727+
throw Exception(ErrorCodes::LOGICAL_ERROR, "Replica info is not initialized");
728+
726729
ProfileEvents::increment(ProfileEvents::ReadTaskRequestsReceived);
727-
auto response = (*extension->task_iterator)();
730+
731+
auto response = (*extension->task_iterator)(extension->replica_info->number_of_current_replica);
728732
connections->sendReadTaskResponse(response);
729733
}
730734

src/QueryPipeline/RemoteQueryExecutor.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ class RemoteQueryExecutorReadContext;
3030
class ParallelReplicasReadingCoordinator;
3131

3232
/// This is the same type as StorageS3Source::IteratorWrapper
33-
using TaskIterator = std::function<String()>;
33+
using TaskIterator = std::function<String(size_t)>;
3434

3535
/// This class allows one to launch queries on remote replicas of one shard and get results
3636
class RemoteQueryExecutor

src/Storages/IStorageCluster.cpp

Lines changed: 47 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@ namespace Setting
3434
extern const SettingsBool async_query_sending_for_remote;
3535
extern const SettingsBool async_socket_for_remote;
3636
extern const SettingsBool skip_unavailable_shards;
37+
extern const SettingsNonZeroUInt64 max_parallel_replicas;
3738
}
3839

3940
namespace ErrorCodes
@@ -59,15 +60,19 @@ void ReadFromCluster::applyFilters(ActionDAGNodes added_filter_nodes)
5960
if (filter_actions_dag)
6061
predicate = filter_actions_dag->getOutputs().at(0);
6162

62-
createExtension(predicate);
63+
auto max_replicas_to_use = static_cast<UInt64>(cluster->getShardsInfo().size());
64+
if (context->getSettingsRef()[Setting::max_parallel_replicas] > 1)
65+
max_replicas_to_use = std::min(max_replicas_to_use, context->getSettingsRef()[Setting::max_parallel_replicas].value);
66+
67+
createExtension(predicate, max_replicas_to_use);
6368
}
6469

65-
void ReadFromCluster::createExtension(const ActionsDAG::Node * predicate)
70+
void ReadFromCluster::createExtension(const ActionsDAG::Node * predicate, size_t number_of_replicas)
6671
{
6772
if (extension)
6873
return;
6974

70-
extension = storage->getTaskIteratorExtension(predicate, context);
75+
extension = storage->getTaskIteratorExtension(predicate, context, number_of_replicas);
7176
}
7277

7378
/// The code executes on initiator
@@ -155,38 +160,54 @@ SinkToStoragePtr IStorageCluster::write(
155160

156161
void ReadFromCluster::initializePipeline(QueryPipelineBuilder & pipeline, const BuildQueryPipelineSettings &)
157162
{
158-
createExtension(nullptr);
159-
160163
const Scalars & scalars = context->hasQueryContext() ? context->getQueryContext()->getScalars() : Scalars{};
161164
const bool add_agg_info = processed_stage == QueryProcessingStage::WithMergeableState;
162165

163166
Pipes pipes;
164167
auto new_context = updateSettings(context->getSettingsRef());
165168
const auto & current_settings = new_context->getSettingsRef();
166169
auto timeouts = ConnectionTimeouts::getTCPTimeoutsWithFailover(current_settings);
170+
171+
size_t replica_index = 0;
172+
auto max_replicas_to_use = static_cast<UInt64>(cluster->getShardsInfo().size());
173+
if (current_settings[Setting::max_parallel_replicas] > 1)
174+
max_replicas_to_use = std::min(max_replicas_to_use, current_settings[Setting::max_parallel_replicas].value);
175+
176+
createExtension(nullptr, max_replicas_to_use);
177+
167178
for (const auto & shard_info : cluster->getShardsInfo())
168179
{
169-
auto try_results = shard_info.pool->getMany(timeouts, current_settings, PoolMode::GET_MANY);
170-
for (auto & try_result : try_results)
171-
{
172-
auto remote_query_executor = std::make_shared<RemoteQueryExecutor>(
173-
std::vector<IConnectionPool::Entry>{try_result},
174-
queryToString(query_to_send),
175-
getOutputHeader(),
176-
new_context,
177-
/*throttler=*/nullptr,
178-
scalars,
179-
Tables(),
180-
processed_stage,
181-
extension);
182-
183-
remote_query_executor->setLogger(log);
184-
pipes.emplace_back(std::make_shared<RemoteSource>(
185-
remote_query_executor,
186-
add_agg_info,
187-
current_settings[Setting::async_socket_for_remote],
188-
current_settings[Setting::async_query_sending_for_remote]));
189-
}
180+
/// We're taking all replicas as shards,
181+
/// so each shard will have only one address to connect to.
182+
auto try_results = shard_info.pool->getMany(
183+
timeouts,
184+
current_settings,
185+
PoolMode::GET_ONE,
186+
{},
187+
/*skip_unavailable_endpoints=*/true);
188+
189+
if (try_results.empty())
190+
continue;
191+
192+
IConnections::ReplicaInfo replica_info{ .number_of_current_replica = replica_index++ };
193+
194+
auto remote_query_executor = std::make_shared<RemoteQueryExecutor>(
195+
std::vector<IConnectionPool::Entry>{try_results.front()},
196+
queryToString(query_to_send),
197+
getOutputHeader(),
198+
new_context,
199+
/*throttler=*/nullptr,
200+
scalars,
201+
Tables(),
202+
processed_stage,
203+
RemoteQueryExecutor::Extension{.task_iterator = extension->task_iterator, .replica_info = std::move(replica_info)});
204+
205+
remote_query_executor->setLogger(log);
206+
pipes.emplace_back(std::make_shared<RemoteSource>(
207+
remote_query_executor,
208+
add_agg_info,
209+
current_settings[Setting::async_socket_for_remote],
210+
current_settings[Setting::async_query_sending_for_remote]));
190211
}
191212

192213
auto pipe = Pipe::unitePipes(std::move(pipes));

src/Storages/IStorageCluster.h

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@ class IStorageCluster : public IStorage
4040

4141
ClusterPtr getCluster(ContextPtr context) const { return getClusterImpl(context, cluster_name); }
4242
/// Query is needed for pruning by virtual columns (_file, _path)
43-
virtual RemoteQueryExecutor::Extension getTaskIteratorExtension(const ActionsDAG::Node * predicate, const ContextPtr & context) const = 0;
43+
virtual RemoteQueryExecutor::Extension getTaskIteratorExtension(const ActionsDAG::Node * predicate, const ContextPtr & context, size_t number_of_replicas) const = 0;
4444

4545
QueryProcessingStage::Enum getQueryProcessingStage(ContextPtr, QueryProcessingStage::Enum, const StorageSnapshotPtr &, SelectQueryInfo &) const override;
4646

@@ -127,7 +127,7 @@ class ReadFromCluster : public SourceStepWithFilter
127127

128128
std::optional<RemoteQueryExecutor::Extension> extension;
129129

130-
void createExtension(const ActionsDAG::Node * predicate);
130+
void createExtension(const ActionsDAG::Node * predicate, const size_t number_of_replicas);
131131
ContextPtr updateSettings(const Settings & settings);
132132
};
133133

src/Storages/ObjectStorage/StorageObjectStorageCluster.cpp

Lines changed: 10 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,8 @@
1717
#include <Storages/ObjectStorage/Utils.h>
1818
#include <Storages/ObjectStorage/StorageObjectStorageSource.h>
1919
#include <Storages/extractTableFunctionArgumentsFromSelectQuery.h>
20+
#include <Storages/ObjectStorage/StorageObjectStorageStableTaskDistributor.h>
21+
2022

2123
namespace DB
2224
{
@@ -279,19 +281,19 @@ void StorageObjectStorageCluster::updateQueryToSendIfNeeded(
279281
}
280282

281283
RemoteQueryExecutor::Extension StorageObjectStorageCluster::getTaskIteratorExtension(
282-
const ActionsDAG::Node * predicate, const ContextPtr & local_context) const
284+
const ActionsDAG::Node * predicate, const ContextPtr & local_context, const size_t number_of_replicas) const
283285
{
284286
auto iterator = StorageObjectStorageSource::createFileIterator(
285287
configuration, configuration->getQuerySettings(local_context), object_storage, /* distributed_processing */false,
286288
local_context, predicate, getVirtualsList(), nullptr, local_context->getFileProgressCallback());
287289

288-
auto callback = std::make_shared<std::function<String()>>([iterator]() mutable -> String
289-
{
290-
auto object_info = iterator->next(0);
291-
if (object_info)
292-
return object_info->getPath();
293-
return "";
294-
});
290+
auto task_distributor = std::make_shared<StorageObjectStorageStableTaskDistributor>(iterator, number_of_replicas);
291+
292+
auto callback = std::make_shared<TaskIterator>(
293+
[task_distributor](size_t number_of_current_replica) mutable -> String {
294+
return task_distributor->getNextTask(number_of_current_replica).value_or("");
295+
});
296+
295297
return RemoteQueryExecutor::Extension{ .task_iterator = std::move(callback) };
296298
}
297299

src/Storages/ObjectStorage/StorageObjectStorageCluster.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ class StorageObjectStorageCluster : public IStorageCluster
3030
std::string getName() const override;
3131

3232
RemoteQueryExecutor::Extension getTaskIteratorExtension(
33-
const ActionsDAG::Node * predicate, const ContextPtr & context) const override;
33+
const ActionsDAG::Node * predicate, const ContextPtr & context, size_t number_of_replicas) const override;
3434

3535
String getPathSample(StorageInMemoryMetadata metadata, ContextPtr context);
3636

Lines changed: 155 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,155 @@
1+
#include "StorageObjectStorageStableTaskDistributor.h"
2+
#include <Common/SipHash.h>
3+
#include <consistent_hashing.h>
4+
#include <optional>
5+
6+
namespace DB
7+
{
8+
9+
StorageObjectStorageStableTaskDistributor::StorageObjectStorageStableTaskDistributor(
10+
std::shared_ptr<IObjectIterator> iterator_,
11+
size_t number_of_replicas_)
12+
: iterator(std::move(iterator_))
13+
, connection_to_files(number_of_replicas_)
14+
, iterator_exhausted(false)
15+
{
16+
}
17+
18+
std::optional<String> StorageObjectStorageStableTaskDistributor::getNextTask(size_t number_of_current_replica)
19+
{
20+
LOG_TRACE(
21+
log,
22+
"Received a new connection from replica {} looking for a file",
23+
number_of_current_replica
24+
);
25+
26+
// 1. Check pre-queued files first
27+
if (auto file = getPreQueuedFile(number_of_current_replica))
28+
return file;
29+
30+
// 2. Try to find a matching file from the iterator
31+
if (auto file = getMatchingFileFromIterator(number_of_current_replica))
32+
return file;
33+
34+
// 3. Process unprocessed files if iterator is exhausted
35+
return getAnyUnprocessedFile(number_of_current_replica);
36+
}
37+
38+
size_t StorageObjectStorageStableTaskDistributor::getReplicaForFile(const String & file_path)
39+
{
40+
return ConsistentHashing(sipHash64(file_path), connection_to_files.size());
41+
}
42+
43+
std::optional<String> StorageObjectStorageStableTaskDistributor::getPreQueuedFile(size_t number_of_current_replica)
44+
{
45+
std::lock_guard lock(mutex);
46+
47+
auto & files = connection_to_files[number_of_current_replica];
48+
49+
while (!files.empty())
50+
{
51+
String next_file = files.back();
52+
files.pop_back();
53+
54+
auto it = unprocessed_files.find(next_file);
55+
if (it == unprocessed_files.end())
56+
continue;
57+
58+
unprocessed_files.erase(it);
59+
60+
LOG_TRACE(
61+
log,
62+
"Assigning pre-queued file {} to replica {}",
63+
next_file,
64+
number_of_current_replica
65+
);
66+
67+
return next_file;
68+
}
69+
70+
return std::nullopt;
71+
}
72+
73+
std::optional<String> StorageObjectStorageStableTaskDistributor::getMatchingFileFromIterator(size_t number_of_current_replica)
74+
{
75+
{
76+
std::lock_guard lock(mutex);
77+
if (iterator_exhausted)
78+
return std::nullopt;
79+
}
80+
81+
while (true)
82+
{
83+
ObjectInfoPtr object_info;
84+
85+
{
86+
std::lock_guard lock(mutex);
87+
object_info = iterator->next(0);
88+
89+
if (!object_info)
90+
{
91+
iterator_exhausted = true;
92+
break;
93+
}
94+
}
95+
96+
String file_path;
97+
98+
auto archive_object_info = std::dynamic_pointer_cast<StorageObjectStorageSource::ArchiveIterator::ObjectInfoInArchive>(object_info);
99+
if (archive_object_info)
100+
{
101+
file_path = archive_object_info->getPathToArchive();
102+
}
103+
else
104+
{
105+
file_path = object_info->getPath();
106+
}
107+
108+
size_t file_replica_idx = getReplicaForFile(file_path);
109+
if (file_replica_idx == number_of_current_replica)
110+
{
111+
LOG_TRACE(
112+
log,
113+
"Found file {} for replica {}",
114+
file_path,
115+
number_of_current_replica
116+
);
117+
118+
return file_path;
119+
}
120+
121+
// Queue file for its assigned replica
122+
{
123+
std::lock_guard lock(mutex);
124+
unprocessed_files.insert(file_path);
125+
connection_to_files[file_replica_idx].push_back(file_path);
126+
}
127+
}
128+
129+
return std::nullopt;
130+
}
131+
132+
std::optional<String> StorageObjectStorageStableTaskDistributor::getAnyUnprocessedFile(size_t number_of_current_replica)
133+
{
134+
std::lock_guard lock(mutex);
135+
136+
if (!unprocessed_files.empty())
137+
{
138+
auto it = unprocessed_files.begin();
139+
String next_file = *it;
140+
unprocessed_files.erase(it);
141+
142+
LOG_TRACE(
143+
log,
144+
"Iterator exhausted. Assigning unprocessed file {} to replica {}",
145+
next_file,
146+
number_of_current_replica
147+
);
148+
149+
return next_file;
150+
}
151+
152+
return std::nullopt;
153+
}
154+
155+
}
Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,45 @@
1+
#pragma once
2+
3+
#include <Client/Connection.h>
4+
#include <Common/Logger.h>
5+
#include <Interpreters/Cluster.h>
6+
#include <Storages/ObjectStorage/StorageObjectStorageSource.h>
7+
#include <Storages/ObjectStorageQueue/ObjectStorageQueueSource.h>
8+
#include <unordered_set>
9+
#include <vector>
10+
#include <mutex>
11+
#include <memory>
12+
13+
namespace DB
14+
{
15+
16+
class StorageObjectStorageStableTaskDistributor
17+
{
18+
public:
19+
using IObjectIterator = StorageObjectStorageSource::IIterator;
20+
using ObjectInfoPtr = StorageObjectStorage::ObjectInfoPtr;
21+
22+
StorageObjectStorageStableTaskDistributor(
23+
std::shared_ptr<IObjectIterator> iterator_,
24+
size_t number_of_replicas_);
25+
26+
std::optional<String> getNextTask(size_t number_of_current_replica);
27+
28+
private:
29+
size_t getReplicaForFile(const String & file_path);
30+
std::optional<String> getPreQueuedFile(size_t number_of_current_replica);
31+
std::optional<String> getMatchingFileFromIterator(size_t number_of_current_replica);
32+
std::optional<String> getAnyUnprocessedFile(size_t number_of_current_replica);
33+
34+
std::shared_ptr<IObjectIterator> iterator;
35+
36+
std::vector<std::vector<String>> connection_to_files;
37+
std::unordered_set<String> unprocessed_files;
38+
39+
std::mutex mutex;
40+
bool iterator_exhausted = false;
41+
42+
LoggerPtr log = getLogger("StorageClusterTaskDistributor");
43+
};
44+
45+
}

0 commit comments

Comments
 (0)