Skip to content

Commit 13ba08e

Browse files
Backport #92339 to 25.11: Check and mark the interserver IO address active in DDL worker
1 parent 6c9708c commit 13ba08e

10 files changed

Lines changed: 246 additions & 57 deletions

File tree

src/Interpreters/Context.cpp

Lines changed: 21 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -5220,25 +5220,32 @@ void Context::startClusterDiscovery()
52205220
/// On repeating calls updates existing clusters and adds new clusters, doesn't delete old clusters
52215221
void Context::setClustersConfig(const ConfigurationPtr & config, bool enable_discovery, const String & config_name)
52225222
{
5223-
std::lock_guard lock(shared->clusters_mutex);
5224-
if (ConfigHelper::getBool(*config, "allow_experimental_cluster_discovery") && enable_discovery && !shared->cluster_discovery)
52255223
{
5226-
shared->cluster_discovery = std::make_unique<ClusterDiscovery>(*config, getGlobalContext(), getMacros());
5227-
}
5224+
std::lock_guard lock(shared->clusters_mutex);
5225+
if (ConfigHelper::getBool(*config, "allow_experimental_cluster_discovery") && enable_discovery && !shared->cluster_discovery)
5226+
{
5227+
shared->cluster_discovery = std::make_unique<ClusterDiscovery>(*config, getGlobalContext(), getMacros());
5228+
}
52285229

5229-
/// Do not update clusters if this part of config wasn't changed.
5230-
if (shared->clusters && isSameConfiguration(*config, *shared->clusters_config, config_name))
5231-
return;
5230+
/// Do not update clusters if this part of config wasn't changed.
5231+
if (shared->clusters && isSameConfiguration(*config, *shared->clusters_config, config_name))
5232+
return;
52325233

5233-
auto old_clusters_config = shared->clusters_config;
5234-
shared->clusters_config = config;
5234+
auto old_clusters_config = shared->clusters_config;
5235+
shared->clusters_config = config;
52355236

5236-
if (!shared->clusters)
5237-
shared->clusters = std::make_shared<Clusters>(*shared->clusters_config, *settings, getMacros(), config_name);
5238-
else
5239-
shared->clusters->updateClusters(*shared->clusters_config, *settings, config_name, old_clusters_config);
5237+
if (!shared->clusters)
5238+
shared->clusters = std::make_shared<Clusters>(*shared->clusters_config, *settings, getMacros(), config_name);
5239+
else
5240+
shared->clusters->updateClusters(*shared->clusters_config, *settings, config_name, old_clusters_config);
52405241

5241-
++shared->clusters_version;
5242+
++shared->clusters_version;
5243+
}
5244+
{
5245+
SharedLockGuard lock(shared->mutex);
5246+
if (shared->ddl_worker)
5247+
shared->ddl_worker->notifyHostIDsUpdated();
5248+
}
52425249
}
52435250

52445251
size_t Context::getClustersVersion() const

src/Interpreters/DDLWorker.cpp

Lines changed: 111 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -2,13 +2,13 @@
22
#include <Core/ServerSettings.h>
33
#include <Core/ServerUUID.h>
44
#include <Core/Settings.h>
5+
#include <Databases/DatabaseReplicated.h>
56
#include <IO/NullWriteBuffer.h>
67
#include <IO/ReadBufferFromString.h>
78
#include <IO/ReadHelpers.h>
89
#include <IO/WriteHelpers.h>
910
#include <Interpreters/Cluster.h>
1011
#include <Interpreters/Context.h>
11-
#include <Interpreters/DDLTask.h>
1212
#include <Interpreters/DDLWorker.h>
1313
#include <Interpreters/DatabaseCatalog.h>
1414
#include <Interpreters/ZooKeeperLog.h>
@@ -189,6 +189,26 @@ ZooKeeperPtr DDLWorker::getAndSetZooKeeper()
189189
return current_zookeeper;
190190
}
191191

192+
void DDLWorker::notifyHostIDsUpdated()
193+
{
194+
LOG_INFO(log, "Host IDs updated");
195+
host_ids_updated = true;
196+
}
197+
198+
void DDLWorker::updateHostIDs(const std::vector<HostID> & hosts)
199+
{
200+
std::lock_guard lock{checked_host_id_set_mutex};
201+
for (const auto & host : hosts)
202+
{
203+
if (!checked_host_id_set.contains(host.toString()))
204+
{
205+
LOG_INFO(log, "Found new host ID: {}", host.toString());
206+
notifyHostIDsUpdated();
207+
return;
208+
}
209+
}
210+
}
211+
192212

193213
DDLTaskPtr DDLWorker::initAndCheckTask(const String & entry_name, String & out_reason, const ZooKeeperPtr & zookeeper, bool /*dry_run*/)
194214
{
@@ -224,6 +244,7 @@ DDLTaskPtr DDLWorker::initAndCheckTask(const String & entry_name, String & out_r
224244
{
225245
/// Stage 1: parse entry
226246
task->entry.parse(node_data);
247+
updateHostIDs(task->entry.hosts);
227248
}
228249
catch (...)
229250
{
@@ -1142,7 +1163,7 @@ bool DDLWorker::initializeMainThread()
11421163
auto zookeeper = getAndSetZooKeeper();
11431164
zookeeper->createAncestors(fs::path(queue_dir) / "");
11441165
initializeReplication();
1145-
markReplicasActive(true);
1166+
markReplicasActive(/*reinitialized=*/true);
11461167
initialized = true;
11471168
return true;
11481169
}
@@ -1197,23 +1218,26 @@ void DDLWorker::runMainThread()
11971218

11981219

11991220
DB::setThreadName(ThreadName::DDL_WORKER);
1200-
LOG_DEBUG(log, "Starting DDLWorker thread");
1201-
1221+
LOG_INFO(log, "Starting DDLWorker thread");
12021222
while (!stop_flag)
12031223
{
12041224
try
12051225
{
12061226
bool reinitialized = !initialized;
12071227

12081228
/// Reinitialize DDLWorker state (including ZooKeeper connection) if required
1209-
if (!initialized)
1229+
if (reinitialized)
12101230
{
12111231
/// Stopped
12121232
if (!initializeMainThread())
12131233
break;
1234+
12141235
LOG_DEBUG(log, "Initialized DDLWorker thread");
12151236
}
12161237

1238+
if (host_ids_updated.exchange(false))
1239+
markReplicasActive(/*reinitialized=*/false);
1240+
12171241
cleanup_event->set();
12181242
scheduleTasks(reinitialized);
12191243
subsequent_errors_count = 0;
@@ -1272,61 +1296,104 @@ void DDLWorker::runMainThread()
12721296
void DDLWorker::initializeReplication()
12731297
{
12741298
auto zookeeper = getZooKeeper();
1275-
12761299
zookeeper->createAncestors(fs::path(replicas_dir) / "");
1277-
1278-
NameSet host_id_set;
1279-
for (const auto & it : context->getClusters())
1280-
{
1281-
auto cluster = it.second;
1282-
for (const auto & host_ids : cluster->getHostIDs())
1283-
for (const auto & host_id : host_ids)
1284-
host_id_set.emplace(host_id);
1285-
}
1286-
1287-
createReplicaDirs(zookeeper, host_id_set);
12881300
}
12891301

12901302
void DDLWorker::createReplicaDirs(const ZooKeeperPtr & zookeeper, const NameSet & host_ids)
12911303
{
12921304
for (const auto & host_id : host_ids)
1305+
{
1306+
LOG_INFO(log, "Creating replica dir for host id {}", host_id);
12931307
zookeeper->createAncestors(fs::path(replicas_dir) / host_id / "");
1308+
}
12941309
}
12951310

1296-
void DDLWorker::markReplicasActive(bool /*reinitialized*/)
1311+
void DDLWorker::markReplicasActive(bool reinitialized)
12971312
{
12981313
auto zookeeper = getZooKeeper();
1314+
const auto maybe_secure_port = context->getTCPPortSecure();
1315+
const auto port = context->getTCPPort();
12991316

1300-
// Reset all active_node_holders
1301-
for (auto & it : active_node_holders)
1317+
auto all_host_ids = getAllHostIDsFromClusters();
1318+
1319+
// Add interserver IO host IDs for Replicated DBs
1320+
try
1321+
{
1322+
auto host_port = context->getInterserverIOAddress();
1323+
HostID interserver_io_host_id = {host_port.first, port};
1324+
all_host_ids.emplace(interserver_io_host_id.toString());
1325+
LOG_INFO(log, "Add interserver IO host ID {}", interserver_io_host_id.toString());
1326+
if (maybe_secure_port)
1327+
{
1328+
HostID interserver_io_secure_host_id = {host_port.first, *maybe_secure_port};
1329+
all_host_ids.emplace(interserver_io_secure_host_id.toString());
1330+
LOG_INFO(log, "Add interserver IO secure host ID {}", interserver_io_secure_host_id.toString());
1331+
}
1332+
}
1333+
catch (const Exception & e)
13021334
{
1303-
auto & active_node_holder = it.second.second;
1304-
if (active_node_holder)
1305-
active_node_holder->setAlreadyRemoved();
1306-
active_node_holder.reset();
1335+
LOG_INFO(log, "Unable to get interserver IO address, error {}", e.what());
13071336
}
13081337

1309-
active_node_holders.clear();
1338+
createReplicaDirs(zookeeper, all_host_ids);
1339+
1340+
if (reinitialized)
1341+
{
1342+
// Reset all active_node_holders
1343+
for (auto & it : active_node_holders)
1344+
{
1345+
auto & active_node_holder = it.second.second;
1346+
if (active_node_holder)
1347+
active_node_holder->setAlreadyRemoved();
1348+
active_node_holder.reset();
1349+
}
1350+
active_node_holders.clear();
1351+
}
13101352

1311-
const auto maybe_secure_port = context->getTCPPortSecure();
1312-
const auto port = context->getTCPPort();
13131353

13141354
Coordination::Stat replicas_stat;
13151355
Strings host_ids = zookeeper->getChildren(replicas_dir, &replicas_stat);
13161356
NameSet local_host_ids;
1357+
NameSet checking_host_ids;
1358+
checking_host_ids.reserve(host_ids.size());
13171359
for (const auto & host_id : host_ids)
13181360
{
1361+
bool is_self_host = false;
13191362
try
13201363
{
13211364
HostID host = HostID::fromString(host_id);
1322-
if (DDLTask::isSelfHostID(log, host, maybe_secure_port, port))
1323-
local_host_ids.emplace(host_id);
1365+
checking_host_ids.insert(host.toString());
1366+
1367+
is_self_host = DDLTask::isSelfHostID(log, host, maybe_secure_port, port);
13241368
}
13251369
catch (const Exception & e)
13261370
{
13271371
LOG_WARNING(log, "Unable to check if host {} is a local address, exception: {}", host_id, e.displayText());
13281372
continue;
13291373
}
1374+
1375+
LOG_INFO(log, "Self host_id ({}) = {}", host_id, is_self_host);
1376+
if (is_self_host)
1377+
{
1378+
local_host_ids.emplace(host_id);
1379+
continue;
1380+
}
1381+
1382+
if (!reinitialized)
1383+
{
1384+
/// Remove this host_id from active_node_holders
1385+
auto it = active_node_holders.find(host_id);
1386+
if (it != active_node_holders.end())
1387+
{
1388+
auto & active_node_holder = it->second.second;
1389+
if (active_node_holder)
1390+
active_node_holder->setAlreadyRemoved();
1391+
active_node_holder.reset();
1392+
1393+
active_node_holders.erase(it);
1394+
}
1395+
continue;
1396+
}
13301397
}
13311398

13321399
for (const auto & host_id : local_host_ids)
@@ -1379,25 +1446,17 @@ void DDLWorker::markReplicasActive(bool /*reinitialized*/)
13791446
}
13801447
else
13811448
{
1382-
LOG_DEBUG(log, "Marked a replica active: active_path={}, active_id={}", active_path, active_id);
1449+
LOG_INFO(log, "Marked a replica active: active_path={}, active_id={}", active_path, active_id);
13831450
}
13841451

13851452
auto active_node_holder_zookeeper = zookeeper;
13861453
auto active_node_holder = zkutil::EphemeralNodeHolder::existing(active_path, *active_node_holder_zookeeper);
13871454
active_node_holders[host_id] = {active_node_holder_zookeeper, active_node_holder};
13881455
}
13891456

1390-
if (active_node_holders.empty())
13911457
{
1392-
for (const auto & it : context->getClusters())
1393-
{
1394-
const auto & cluster = it.second;
1395-
if (!cluster->getHostIDs().empty())
1396-
{
1397-
LOG_WARNING(log, "There are clusters with host ids but no local host found for this replica.");
1398-
break;
1399-
}
1400-
}
1458+
std::lock_guard lock{checked_host_id_set_mutex};
1459+
checked_host_id_set = checking_host_ids;
14011460
}
14021461
}
14031462

@@ -1464,4 +1523,16 @@ void DDLWorker::runCleanupThread()
14641523
}
14651524
}
14661525

1526+
NameSet DDLWorker::getAllHostIDsFromClusters() const
1527+
{
1528+
NameSet host_id_set;
1529+
for (const auto & it : context->getClusters())
1530+
{
1531+
auto cluster = it.second;
1532+
for (const auto & host_ids : cluster->getHostIDs())
1533+
for (const auto & host_id : host_ids)
1534+
host_id_set.emplace(host_id);
1535+
}
1536+
return host_id_set;
1537+
}
14671538
}

src/Interpreters/DDLWorker.h

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,16 +1,18 @@
11
#pragma once
22

3+
#include <Core/Names.h>
4+
#include <Interpreters/Context_fwd.h>
5+
#include <Interpreters/DDLTask.h>
36
#include <Parsers/IAST_fwd.h>
47
#include <Storages/IStorage_fwd.h>
8+
#include <Poco/Event.h>
59
#include <Common/CurrentMetrics.h>
610
#include <Common/CurrentThread.h>
711
#include <Common/DNSResolver.h>
812
#include <Common/SharedMutex.h>
913
#include <Common/ThreadPool_fwd.h>
1014
#include <Common/ZooKeeper/IKeeper.h>
1115
#include <Common/ZooKeeper/ZooKeeper.h>
12-
#include <Interpreters/Context_fwd.h>
13-
#include <Poco/Event.h>
1416

1517
#include <atomic>
1618
#include <list>
@@ -94,6 +96,9 @@ class DDLWorker
9496
/// Should be called in `initializeMainThread` only, so if it is expired, `runMainThread` will reinitialized the state.
9597
ZooKeeperPtr getAndSetZooKeeper();
9698

99+
void notifyHostIDsUpdated();
100+
void updateHostIDs(const std::vector<HostID> & hosts);
101+
97102
protected:
98103

99104
class ConcurrentSet
@@ -173,6 +178,8 @@ class DDLWorker
173178
void runMainThread();
174179
void runCleanupThread();
175180

181+
NameSet getAllHostIDsFromClusters() const;
182+
176183
ContextMutablePtr context;
177184
LoggerPtr log;
178185

@@ -209,6 +216,7 @@ class DDLWorker
209216

210217
/// Cleaning starts after new node event is received if the last cleaning wasn't made sooner than N seconds ago
211218
Int64 cleanup_delay_period = 60; // minute (in seconds)
219+
std::atomic_bool host_ids_updated{false};
212220
/// Delete node if its age is greater than that
213221
Int64 task_max_lifetime = 7 * 24 * 60 * 60; // week (in seconds)
214222
/// How many tasks could be in the queue
@@ -221,6 +229,10 @@ class DDLWorker
221229
std::atomic_uint64_t subsequent_errors_count = 0;
222230
String last_unexpected_error;
223231

232+
mutable std::mutex checked_host_id_set_mutex;
233+
NameSet checked_host_id_set TSA_GUARDED_BY(checked_host_id_set_mutex);
234+
235+
224236
const CurrentMetrics::Metric * max_entry_metric;
225237
const CurrentMetrics::Metric * max_pushed_entry_metric;
226238

src/Interpreters/DistributedQueryStatusSource.cpp

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -102,7 +102,8 @@ NameSet DistributedQueryStatusSource::getOfflineHosts(const NameSet & hosts_to_w
102102
if (offline.size() == hosts_to_wait.size())
103103
{
104104
/// Avoid reporting that all hosts are offline
105-
LOG_WARNING(log, "Did not find active hosts, will wait for all {} hosts. This should not happen often", offline.size());
105+
LOG_WARNING(
106+
log, "Did not find active hosts, will wait for all hosts: {}. This should not happen often", fmt::join(hosts_to_wait, ", "));
106107
return {};
107108
}
108109

src/Interpreters/executeDDLQueryOnCluster.cpp

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -200,6 +200,7 @@ BlockIO executeDDLQueryOnCluster(const ASTPtr & query_ptr_, ContextPtr context,
200200
entry.initiator_user = context->getUserName();
201201
entry.initiator_user_roles = context->getAccessControl().tryReadNames(context->getCurrentRoles());
202202
}
203+
ddl_worker.updateHostIDs(entry.hosts);
203204
String node_path = ddl_worker.enqueueQuery(entry, params.retries_info);
204205

205206
return getDDLOnClusterStatus(node_path, ddl_worker.getReplicasDir(), entry, context);

tests/integration/test_distributed_ddl_on_database_cluster/__init__.py

Whitespace-only changes.
Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
<clickhouse>
2+
<remote_servers>
3+
</remote_servers>
4+
</clickhouse>

0 commit comments

Comments
 (0)