Skip to content

Commit 91627b0

Browse files
tuanpachzvonand
authored andcommitted
Merge pull request ClickHouse#92339 from tuanpach/ddl-worker-mark-replicas-active-on-new-host-ids
Check and mark the interserver IO address active in DDL worker
1 parent 38e8e00 commit 91627b0

10 files changed

Lines changed: 245 additions & 55 deletions

File tree

src/Interpreters/Context.cpp

Lines changed: 21 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -4988,25 +4988,32 @@ void Context::startClusterDiscovery()
49884988
/// On repeating calls updates existing clusters and adds new clusters, doesn't delete old clusters
49894989
void Context::setClustersConfig(const ConfigurationPtr & config, bool enable_discovery, const String & config_name)
49904990
{
4991-
std::lock_guard lock(shared->clusters_mutex);
4992-
if (ConfigHelper::getBool(*config, "allow_experimental_cluster_discovery") && enable_discovery && !shared->cluster_discovery)
49934991
{
4994-
shared->cluster_discovery = std::make_unique<ClusterDiscovery>(*config, getGlobalContext(), getMacros());
4995-
}
4992+
std::lock_guard lock(shared->clusters_mutex);
4993+
if (ConfigHelper::getBool(*config, "allow_experimental_cluster_discovery") && enable_discovery && !shared->cluster_discovery)
4994+
{
4995+
shared->cluster_discovery = std::make_unique<ClusterDiscovery>(*config, getGlobalContext(), getMacros());
4996+
}
49964997

4997-
/// Do not update clusters if this part of config wasn't changed.
4998-
if (shared->clusters && isSameConfiguration(*config, *shared->clusters_config, config_name))
4999-
return;
4998+
/// Do not update clusters if this part of config wasn't changed.
4999+
if (shared->clusters && isSameConfiguration(*config, *shared->clusters_config, config_name))
5000+
return;
50005001

5001-
auto old_clusters_config = shared->clusters_config;
5002-
shared->clusters_config = config;
5002+
auto old_clusters_config = shared->clusters_config;
5003+
shared->clusters_config = config;
50035004

5004-
if (!shared->clusters)
5005-
shared->clusters = std::make_shared<Clusters>(*shared->clusters_config, *settings, getMacros(), config_name);
5006-
else
5007-
shared->clusters->updateClusters(*shared->clusters_config, *settings, config_name, old_clusters_config);
5005+
if (!shared->clusters)
5006+
shared->clusters = std::make_shared<Clusters>(*shared->clusters_config, *settings, getMacros(), config_name);
5007+
else
5008+
shared->clusters->updateClusters(*shared->clusters_config, *settings, config_name, old_clusters_config);
50085009

5009-
++shared->clusters_version;
5010+
++shared->clusters_version;
5011+
}
5012+
{
5013+
SharedLockGuard lock(shared->mutex);
5014+
if (shared->ddl_worker)
5015+
shared->ddl_worker->notifyHostIDsUpdated();
5016+
}
50105017
}
50115018

50125019
size_t Context::getClustersVersion() const

src/Interpreters/DDLWorker.cpp

Lines changed: 110 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -2,13 +2,13 @@
22
#include <Core/ServerSettings.h>
33
#include <Core/ServerUUID.h>
44
#include <Core/Settings.h>
5+
#include <Databases/DatabaseReplicated.h>
56
#include <IO/NullWriteBuffer.h>
67
#include <IO/ReadBufferFromString.h>
78
#include <IO/ReadHelpers.h>
89
#include <IO/WriteHelpers.h>
910
#include <Interpreters/Cluster.h>
1011
#include <Interpreters/Context.h>
11-
#include <Interpreters/DDLTask.h>
1212
#include <Interpreters/DDLWorker.h>
1313
#include <Interpreters/DatabaseCatalog.h>
1414
#include <Interpreters/ZooKeeperLog.h>
@@ -188,6 +188,26 @@ ZooKeeperPtr DDLWorker::getAndSetZooKeeper()
188188
return current_zookeeper;
189189
}
190190

191+
void DDLWorker::notifyHostIDsUpdated()
192+
{
193+
LOG_INFO(log, "Host IDs updated");
194+
host_ids_updated = true;
195+
}
196+
197+
void DDLWorker::updateHostIDs(const std::vector<HostID> & hosts)
198+
{
199+
std::lock_guard lock{checked_host_id_set_mutex};
200+
for (const auto & host : hosts)
201+
{
202+
if (!checked_host_id_set.contains(host.toString()))
203+
{
204+
LOG_INFO(log, "Found new host ID: {}", host.toString());
205+
notifyHostIDsUpdated();
206+
return;
207+
}
208+
}
209+
}
210+
191211

192212
DDLTaskPtr DDLWorker::initAndCheckTask(const String & entry_name, String & out_reason, const ZooKeeperPtr & zookeeper, bool /*dry_run*/)
193213
{
@@ -223,6 +243,7 @@ DDLTaskPtr DDLWorker::initAndCheckTask(const String & entry_name, String & out_r
223243
{
224244
/// Stage 1: parse entry
225245
task->entry.parse(node_data);
246+
updateHostIDs(task->entry.hosts);
226247
}
227248
catch (...)
228249
{
@@ -1139,7 +1160,7 @@ bool DDLWorker::initializeMainThread()
11391160
auto zookeeper = getAndSetZooKeeper();
11401161
zookeeper->createAncestors(fs::path(queue_dir) / "");
11411162
initializeReplication();
1142-
markReplicasActive(true);
1163+
markReplicasActive(/*reinitialized=*/true);
11431164
initialized = true;
11441165
return true;
11451166
}
@@ -1194,7 +1215,7 @@ void DDLWorker::runMainThread()
11941215

11951216

11961217
setThreadName("DDLWorker");
1197-
LOG_DEBUG(log, "Starting DDLWorker thread");
1218+
LOG_INFO(log, "Starting DDLWorker thread");
11981219

11991220
while (!stop_flag)
12001221
{
@@ -1203,14 +1224,18 @@ void DDLWorker::runMainThread()
12031224
bool reinitialized = !initialized;
12041225

12051226
/// Reinitialize DDLWorker state (including ZooKeeper connection) if required
1206-
if (!initialized)
1227+
if (reinitialized)
12071228
{
12081229
/// Stopped
12091230
if (!initializeMainThread())
12101231
break;
1232+
12111233
LOG_DEBUG(log, "Initialized DDLWorker thread");
12121234
}
12131235

1236+
if (host_ids_updated.exchange(false))
1237+
markReplicasActive(/*reinitialized=*/false);
1238+
12141239
cleanup_event->set();
12151240
scheduleTasks(reinitialized);
12161241
subsequent_errors_count = 0;
@@ -1269,61 +1294,104 @@ void DDLWorker::runMainThread()
12691294
void DDLWorker::initializeReplication()
12701295
{
12711296
auto zookeeper = getZooKeeper();
1272-
12731297
zookeeper->createAncestors(fs::path(replicas_dir) / "");
1274-
1275-
NameSet host_id_set;
1276-
for (const auto & it : context->getClusters())
1277-
{
1278-
auto cluster = it.second;
1279-
for (const auto & host_ids : cluster->getHostIDs())
1280-
for (const auto & host_id : host_ids)
1281-
host_id_set.emplace(host_id);
1282-
}
1283-
1284-
createReplicaDirs(zookeeper, host_id_set);
12851298
}
12861299

12871300
void DDLWorker::createReplicaDirs(const ZooKeeperPtr & zookeeper, const NameSet & host_ids)
12881301
{
12891302
for (const auto & host_id : host_ids)
1303+
{
1304+
LOG_INFO(log, "Creating replica dir for host id {}", host_id);
12901305
zookeeper->createAncestors(fs::path(replicas_dir) / host_id / "");
1306+
}
12911307
}
12921308

1293-
void DDLWorker::markReplicasActive(bool /*reinitialized*/)
1309+
void DDLWorker::markReplicasActive(bool reinitialized)
12941310
{
12951311
auto zookeeper = getZooKeeper();
1312+
const auto maybe_secure_port = context->getTCPPortSecure();
1313+
const auto port = context->getTCPPort();
1314+
1315+
auto all_host_ids = getAllHostIDsFromClusters();
12961316

1297-
// Reset all active_node_holders
1298-
for (auto & it : active_node_holders)
1317+
// Add interserver IO host IDs for Replicated DBs
1318+
try
1319+
{
1320+
auto host_port = context->getInterserverIOAddress();
1321+
HostID interserver_io_host_id = {host_port.first, port};
1322+
all_host_ids.emplace(interserver_io_host_id.toString());
1323+
LOG_INFO(log, "Add interserver IO host ID {}", interserver_io_host_id.toString());
1324+
if (maybe_secure_port)
1325+
{
1326+
HostID interserver_io_secure_host_id = {host_port.first, *maybe_secure_port};
1327+
all_host_ids.emplace(interserver_io_secure_host_id.toString());
1328+
LOG_INFO(log, "Add interserver IO secure host ID {}", interserver_io_secure_host_id.toString());
1329+
}
1330+
}
1331+
catch (const Exception & e)
12991332
{
1300-
auto & active_node_holder = it.second.second;
1301-
if (active_node_holder)
1302-
active_node_holder->setAlreadyRemoved();
1303-
active_node_holder.reset();
1333+
LOG_INFO(log, "Unable to get interserver IO address, error {}", e.what());
13041334
}
13051335

1306-
active_node_holders.clear();
1336+
createReplicaDirs(zookeeper, all_host_ids);
1337+
1338+
if (reinitialized)
1339+
{
1340+
// Reset all active_node_holders
1341+
for (auto & it : active_node_holders)
1342+
{
1343+
auto & active_node_holder = it.second.second;
1344+
if (active_node_holder)
1345+
active_node_holder->setAlreadyRemoved();
1346+
active_node_holder.reset();
1347+
}
1348+
active_node_holders.clear();
1349+
}
13071350

1308-
const auto maybe_secure_port = context->getTCPPortSecure();
1309-
const auto port = context->getTCPPort();
13101351

13111352
Coordination::Stat replicas_stat;
13121353
Strings host_ids = zookeeper->getChildren(replicas_dir, &replicas_stat);
13131354
NameSet local_host_ids;
1355+
NameSet checking_host_ids;
1356+
checking_host_ids.reserve(host_ids.size());
13141357
for (const auto & host_id : host_ids)
13151358
{
1359+
bool is_self_host = false;
13161360
try
13171361
{
13181362
HostID host = HostID::fromString(host_id);
1319-
if (DDLTask::isSelfHostID(log, host, maybe_secure_port, port))
1320-
local_host_ids.emplace(host_id);
1363+
checking_host_ids.insert(host.toString());
1364+
1365+
is_self_host = DDLTask::isSelfHostID(log, host, maybe_secure_port, port);
13211366
}
13221367
catch (const Exception & e)
13231368
{
13241369
LOG_WARNING(log, "Unable to check if host {} is a local address, exception: {}", host_id, e.displayText());
13251370
continue;
13261371
}
1372+
1373+
LOG_INFO(log, "Self host_id ({}) = {}", host_id, is_self_host);
1374+
if (is_self_host)
1375+
{
1376+
local_host_ids.emplace(host_id);
1377+
continue;
1378+
}
1379+
1380+
if (!reinitialized)
1381+
{
1382+
/// Remove this host_id from active_node_holders
1383+
auto it = active_node_holders.find(host_id);
1384+
if (it != active_node_holders.end())
1385+
{
1386+
auto & active_node_holder = it->second.second;
1387+
if (active_node_holder)
1388+
active_node_holder->setAlreadyRemoved();
1389+
active_node_holder.reset();
1390+
1391+
active_node_holders.erase(it);
1392+
}
1393+
continue;
1394+
}
13271395
}
13281396

13291397
for (const auto & host_id : local_host_ids)
@@ -1371,17 +1439,9 @@ void DDLWorker::markReplicasActive(bool /*reinitialized*/)
13711439
active_node_holders[host_id] = {active_node_holder_zookeeper, active_node_holder};
13721440
}
13731441

1374-
if (active_node_holders.empty())
13751442
{
1376-
for (const auto & it : context->getClusters())
1377-
{
1378-
const auto & cluster = it.second;
1379-
if (!cluster->getHostIDs().empty())
1380-
{
1381-
LOG_WARNING(log, "There are clusters with host ids but no local host found for this replica.");
1382-
break;
1383-
}
1384-
}
1443+
std::lock_guard lock{checked_host_id_set_mutex};
1444+
checked_host_id_set = checking_host_ids;
13851445
}
13861446
}
13871447

@@ -1448,4 +1508,16 @@ void DDLWorker::runCleanupThread()
14481508
}
14491509
}
14501510

1511+
NameSet DDLWorker::getAllHostIDsFromClusters() const
1512+
{
1513+
NameSet host_id_set;
1514+
for (const auto & it : context->getClusters())
1515+
{
1516+
auto cluster = it.second;
1517+
for (const auto & host_ids : cluster->getHostIDs())
1518+
for (const auto & host_id : host_ids)
1519+
host_id_set.emplace(host_id);
1520+
}
1521+
return host_id_set;
1522+
}
14511523
}

src/Interpreters/DDLWorker.h

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,17 @@
11
#pragma once
22

3+
#include <Core/Names.h>
4+
#include <Interpreters/Context_fwd.h>
5+
#include <Interpreters/DDLTask.h>
36
#include <Parsers/IAST_fwd.h>
47
#include <Storages/IStorage_fwd.h>
8+
#include <Poco/Event.h>
59
#include <Common/CurrentMetrics.h>
610
#include <Common/CurrentThread.h>
711
#include <Common/DNSResolver.h>
812
#include <Common/ThreadPool_fwd.h>
913
#include <Common/ZooKeeper/IKeeper.h>
1014
#include <Common/ZooKeeper/ZooKeeper.h>
11-
#include <Interpreters/Context_fwd.h>
12-
#include <Poco/Event.h>
1315

1416
#include <atomic>
1517
#include <list>
@@ -94,6 +96,9 @@ class DDLWorker
9496
/// Should be called in `initializeMainThread` only, so if it is expired, `runMainThread` will reinitialized the state.
9597
ZooKeeperPtr getAndSetZooKeeper();
9698

99+
void notifyHostIDsUpdated();
100+
void updateHostIDs(const std::vector<HostID> & hosts);
101+
97102
protected:
98103

99104
class ConcurrentSet
@@ -173,6 +178,8 @@ class DDLWorker
173178
void runMainThread();
174179
void runCleanupThread();
175180

181+
NameSet getAllHostIDsFromClusters() const;
182+
176183
ContextMutablePtr context;
177184
LoggerPtr log;
178185

@@ -209,6 +216,7 @@ class DDLWorker
209216

210217
/// Cleaning starts after new node event is received if the last cleaning wasn't made sooner than N seconds ago
211218
Int64 cleanup_delay_period = 60; // minute (in seconds)
219+
std::atomic_bool host_ids_updated{false};
212220
/// Delete node if its age is greater than that
213221
Int64 task_max_lifetime = 7 * 24 * 60 * 60; // week (in seconds)
214222
/// How many tasks could be in the queue
@@ -221,6 +229,10 @@ class DDLWorker
221229
std::atomic_uint64_t subsequent_errors_count = 0;
222230
String last_unexpected_error;
223231

232+
mutable std::mutex checked_host_id_set_mutex;
233+
NameSet checked_host_id_set TSA_GUARDED_BY(checked_host_id_set_mutex);
234+
235+
224236
const CurrentMetrics::Metric * max_entry_metric;
225237
const CurrentMetrics::Metric * max_pushed_entry_metric;
226238

src/Interpreters/DistributedQueryStatusSource.cpp

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -102,7 +102,8 @@ NameSet DistributedQueryStatusSource::getOfflineHosts(const NameSet & hosts_to_w
102102
if (offline.size() == hosts_to_wait.size())
103103
{
104104
/// Avoid reporting that all hosts are offline
105-
LOG_WARNING(log, "Did not find active hosts, will wait for all {} hosts. This should not happen often", offline.size());
105+
LOG_WARNING(
106+
log, "Did not find active hosts, will wait for all hosts: {}. This should not happen often", fmt::join(hosts_to_wait, ", "));
106107
return {};
107108
}
108109

src/Interpreters/executeDDLQueryOnCluster.cpp

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -188,6 +188,7 @@ BlockIO executeDDLQueryOnCluster(const ASTPtr & query_ptr_, ContextPtr context,
188188
entry.setSettingsIfRequired(context);
189189
entry.tracing_context = OpenTelemetry::CurrentContext();
190190
entry.initial_query_id = context->getClientInfo().initial_query_id;
191+
ddl_worker.updateHostIDs(entry.hosts);
191192
String node_path = ddl_worker.enqueueQuery(entry, params.retries_info);
192193

193194
return getDDLOnClusterStatus(node_path, ddl_worker.getReplicasDir(), entry, context);

tests/integration/test_distributed_ddl_on_database_cluster/__init__.py

Whitespace-only changes.
Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
<clickhouse>
2+
<remote_servers>
3+
</remote_servers>
4+
</clickhouse>
Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
<clickhouse>
2+
<distributed_ddl>
3+
<path>/clickhouse/task_queue/ddl</path>
4+
<max_tasks_in_queue>10</max_tasks_in_queue>
5+
<task_max_lifetime>3600</task_max_lifetime>
6+
<cleanup_delay_period>5</cleanup_delay_period>
7+
</distributed_ddl>
8+
</clickhouse>

0 commit comments

Comments
 (0)