22#include < Core/ServerSettings.h>
33#include < Core/ServerUUID.h>
44#include < Core/Settings.h>
5+ #include < Databases/DatabaseReplicated.h>
56#include < IO/NullWriteBuffer.h>
67#include < IO/ReadBufferFromString.h>
78#include < IO/ReadHelpers.h>
89#include < IO/WriteHelpers.h>
910#include < Interpreters/Cluster.h>
1011#include < Interpreters/Context.h>
11- #include < Interpreters/DDLTask.h>
1212#include < Interpreters/DDLWorker.h>
1313#include < Interpreters/DatabaseCatalog.h>
1414#include < Interpreters/ZooKeeperLog.h>
@@ -188,6 +188,26 @@ ZooKeeperPtr DDLWorker::getAndSetZooKeeper()
188188 return current_zookeeper;
189189}
190190
191+ void DDLWorker::notifyHostIDsUpdated ()
192+ {
193+ LOG_INFO (log, " Host IDs updated" );
194+ host_ids_updated = true ;
195+ }
196+
197+ void DDLWorker::updateHostIDs (const std::vector<HostID> & hosts)
198+ {
199+ std::lock_guard lock{checked_host_id_set_mutex};
200+ for (const auto & host : hosts)
201+ {
202+ if (!checked_host_id_set.contains (host.toString ()))
203+ {
204+ LOG_INFO (log, " Found new host ID: {}" , host.toString ());
205+ notifyHostIDsUpdated ();
206+ return ;
207+ }
208+ }
209+ }
210+
191211
192212DDLTaskPtr DDLWorker::initAndCheckTask (const String & entry_name, String & out_reason, const ZooKeeperPtr & zookeeper, bool /* dry_run*/ )
193213{
@@ -223,6 +243,7 @@ DDLTaskPtr DDLWorker::initAndCheckTask(const String & entry_name, String & out_r
223243 {
224244 // / Stage 1: parse entry
225245 task->entry .parse (node_data);
246+ updateHostIDs (task->entry .hosts );
226247 }
227248 catch (...)
228249 {
@@ -1139,7 +1160,7 @@ bool DDLWorker::initializeMainThread()
11391160 auto zookeeper = getAndSetZooKeeper ();
11401161 zookeeper->createAncestors (fs::path (queue_dir) / " " );
11411162 initializeReplication ();
1142- markReplicasActive (true );
1163+ markReplicasActive (/* reinitialized= */ true );
11431164 initialized = true ;
11441165 return true ;
11451166 }
@@ -1194,7 +1215,7 @@ void DDLWorker::runMainThread()
11941215
11951216
11961217 setThreadName (" DDLWorker" );
1197- LOG_DEBUG (log, " Starting DDLWorker thread" );
1218+ LOG_INFO (log, " Starting DDLWorker thread" );
11981219
11991220 while (!stop_flag)
12001221 {
@@ -1203,14 +1224,18 @@ void DDLWorker::runMainThread()
12031224 bool reinitialized = !initialized;
12041225
12051226 // / Reinitialize DDLWorker state (including ZooKeeper connection) if required
1206- if (!initialized )
1227+ if (reinitialized )
12071228 {
12081229 // / Stopped
12091230 if (!initializeMainThread ())
12101231 break ;
1232+
12111233 LOG_DEBUG (log, " Initialized DDLWorker thread" );
12121234 }
12131235
1236+ if (host_ids_updated.exchange (false ))
1237+ markReplicasActive (/* reinitialized=*/ false );
1238+
12141239 cleanup_event->set ();
12151240 scheduleTasks (reinitialized);
12161241 subsequent_errors_count = 0 ;
@@ -1269,61 +1294,104 @@ void DDLWorker::runMainThread()
12691294void DDLWorker::initializeReplication ()
12701295{
12711296 auto zookeeper = getZooKeeper ();
1272-
12731297 zookeeper->createAncestors (fs::path (replicas_dir) / " " );
1274-
1275- NameSet host_id_set;
1276- for (const auto & it : context->getClusters ())
1277- {
1278- auto cluster = it.second ;
1279- for (const auto & host_ids : cluster->getHostIDs ())
1280- for (const auto & host_id : host_ids)
1281- host_id_set.emplace (host_id);
1282- }
1283-
1284- createReplicaDirs (zookeeper, host_id_set);
12851298}
12861299
12871300void DDLWorker::createReplicaDirs (const ZooKeeperPtr & zookeeper, const NameSet & host_ids)
12881301{
12891302 for (const auto & host_id : host_ids)
1303+ {
1304+ LOG_INFO (log, " Creating replica dir for host id {}" , host_id);
12901305 zookeeper->createAncestors (fs::path (replicas_dir) / host_id / " " );
1306+ }
12911307}
12921308
1293- void DDLWorker::markReplicasActive (bool /* reinitialized*/ )
1309+ void DDLWorker::markReplicasActive (bool reinitialized)
12941310{
12951311 auto zookeeper = getZooKeeper ();
1312+ const auto maybe_secure_port = context->getTCPPortSecure ();
1313+ const auto port = context->getTCPPort ();
1314+
1315+ auto all_host_ids = getAllHostIDsFromClusters ();
12961316
1297- // Reset all active_node_holders
1298- for (auto & it : active_node_holders)
1317+ // Add interserver IO host IDs for Replicated DBs
1318+ try
1319+ {
1320+ auto host_port = context->getInterserverIOAddress ();
1321+ HostID interserver_io_host_id = {host_port.first , port};
1322+ all_host_ids.emplace (interserver_io_host_id.toString ());
1323+ LOG_INFO (log, " Add interserver IO host ID {}" , interserver_io_host_id.toString ());
1324+ if (maybe_secure_port)
1325+ {
1326+ HostID interserver_io_secure_host_id = {host_port.first , *maybe_secure_port};
1327+ all_host_ids.emplace (interserver_io_secure_host_id.toString ());
1328+ LOG_INFO (log, " Add interserver IO secure host ID {}" , interserver_io_secure_host_id.toString ());
1329+ }
1330+ }
1331+ catch (const Exception & e)
12991332 {
1300- auto & active_node_holder = it.second .second ;
1301- if (active_node_holder)
1302- active_node_holder->setAlreadyRemoved ();
1303- active_node_holder.reset ();
1333+ LOG_INFO (log, " Unable to get interserver IO address, error {}" , e.what ());
13041334 }
13051335
1306- active_node_holders.clear ();
1336+ createReplicaDirs (zookeeper, all_host_ids);
1337+
1338+ if (reinitialized)
1339+ {
1340+ // Reset all active_node_holders
1341+ for (auto & it : active_node_holders)
1342+ {
1343+ auto & active_node_holder = it.second .second ;
1344+ if (active_node_holder)
1345+ active_node_holder->setAlreadyRemoved ();
1346+ active_node_holder.reset ();
1347+ }
1348+ active_node_holders.clear ();
1349+ }
13071350
1308- const auto maybe_secure_port = context->getTCPPortSecure ();
1309- const auto port = context->getTCPPort ();
13101351
13111352 Coordination::Stat replicas_stat;
13121353 Strings host_ids = zookeeper->getChildren (replicas_dir, &replicas_stat);
13131354 NameSet local_host_ids;
1355+ NameSet checking_host_ids;
1356+ checking_host_ids.reserve (host_ids.size ());
13141357 for (const auto & host_id : host_ids)
13151358 {
1359+ bool is_self_host = false ;
13161360 try
13171361 {
13181362 HostID host = HostID::fromString (host_id);
1319- if (DDLTask::isSelfHostID (log, host, maybe_secure_port, port))
1320- local_host_ids.emplace (host_id);
1363+ checking_host_ids.insert (host.toString ());
1364+
1365+ is_self_host = DDLTask::isSelfHostID (log, host, maybe_secure_port, port);
13211366 }
13221367 catch (const Exception & e)
13231368 {
13241369 LOG_WARNING (log, " Unable to check if host {} is a local address, exception: {}" , host_id, e.displayText ());
13251370 continue ;
13261371 }
1372+
1373+ LOG_INFO (log, " Self host_id ({}) = {}" , host_id, is_self_host);
1374+ if (is_self_host)
1375+ {
1376+ local_host_ids.emplace (host_id);
1377+ continue ;
1378+ }
1379+
1380+ if (!reinitialized)
1381+ {
1382+ // / Remove this host_id from active_node_holders
1383+ auto it = active_node_holders.find (host_id);
1384+ if (it != active_node_holders.end ())
1385+ {
1386+ auto & active_node_holder = it->second .second ;
1387+ if (active_node_holder)
1388+ active_node_holder->setAlreadyRemoved ();
1389+ active_node_holder.reset ();
1390+
1391+ active_node_holders.erase (it);
1392+ }
1393+ continue ;
1394+ }
13271395 }
13281396
13291397 for (const auto & host_id : local_host_ids)
@@ -1371,17 +1439,9 @@ void DDLWorker::markReplicasActive(bool /*reinitialized*/)
13711439 active_node_holders[host_id] = {active_node_holder_zookeeper, active_node_holder};
13721440 }
13731441
1374- if (active_node_holders.empty ())
13751442 {
1376- for (const auto & it : context->getClusters ())
1377- {
1378- const auto & cluster = it.second ;
1379- if (!cluster->getHostIDs ().empty ())
1380- {
1381- LOG_WARNING (log, " There are clusters with host ids but no local host found for this replica." );
1382- break ;
1383- }
1384- }
1443+ std::lock_guard lock{checked_host_id_set_mutex};
1444+ checked_host_id_set = checking_host_ids;
13851445 }
13861446}
13871447
@@ -1448,4 +1508,16 @@ void DDLWorker::runCleanupThread()
14481508 }
14491509}
14501510
1511+ NameSet DDLWorker::getAllHostIDsFromClusters () const
1512+ {
1513+ NameSet host_id_set;
1514+ for (const auto & it : context->getClusters ())
1515+ {
1516+ auto cluster = it.second ;
1517+ for (const auto & host_ids : cluster->getHostIDs ())
1518+ for (const auto & host_id : host_ids)
1519+ host_id_set.emplace (host_id);
1520+ }
1521+ return host_id_set;
1522+ }
14511523}
0 commit comments