|
2 | 2 | #include <Core/ServerSettings.h> |
3 | 3 | #include <Core/ServerUUID.h> |
4 | 4 | #include <Core/Settings.h> |
| 5 | +#include <Databases/DatabaseReplicated.h> |
5 | 6 | #include <IO/NullWriteBuffer.h> |
6 | 7 | #include <IO/ReadBufferFromString.h> |
7 | 8 | #include <IO/ReadHelpers.h> |
8 | 9 | #include <IO/WriteHelpers.h> |
9 | 10 | #include <Interpreters/Cluster.h> |
10 | 11 | #include <Interpreters/Context.h> |
11 | | -#include <Interpreters/DDLTask.h> |
12 | 12 | #include <Interpreters/DDLWorker.h> |
13 | 13 | #include <Interpreters/DatabaseCatalog.h> |
14 | 14 | #include <Interpreters/ZooKeeperLog.h> |
@@ -189,6 +189,26 @@ ZooKeeperPtr DDLWorker::getAndSetZooKeeper() |
189 | 189 | return current_zookeeper; |
190 | 190 | } |
191 | 191 |
|
| 192 | +void DDLWorker::notifyHostIDsUpdated() |
| 193 | +{ |
| 194 | + LOG_INFO(log, "Host IDs updated"); |
| 195 | + host_ids_updated = true; |
| 196 | +} |
| 197 | + |
| 198 | +void DDLWorker::updateHostIDs(const std::vector<HostID> & hosts) |
| 199 | +{ |
| 200 | + std::lock_guard lock{checked_host_id_set_mutex}; |
| 201 | + for (const auto & host : hosts) |
| 202 | + { |
| 203 | + if (!checked_host_id_set.contains(host.toString())) |
| 204 | + { |
| 205 | + LOG_INFO(log, "Found new host ID: {}", host.toString()); |
| 206 | + notifyHostIDsUpdated(); |
| 207 | + return; |
| 208 | + } |
| 209 | + } |
| 210 | +} |
| 211 | + |
192 | 212 |
|
193 | 213 | DDLTaskPtr DDLWorker::initAndCheckTask(const String & entry_name, String & out_reason, const ZooKeeperPtr & zookeeper, bool /*dry_run*/) |
194 | 214 | { |
@@ -224,6 +244,7 @@ DDLTaskPtr DDLWorker::initAndCheckTask(const String & entry_name, String & out_r |
224 | 244 | { |
225 | 245 | /// Stage 1: parse entry |
226 | 246 | task->entry.parse(node_data); |
| 247 | + updateHostIDs(task->entry.hosts); |
227 | 248 | } |
228 | 249 | catch (...) |
229 | 250 | { |
@@ -1142,7 +1163,7 @@ bool DDLWorker::initializeMainThread() |
1142 | 1163 | auto zookeeper = getAndSetZooKeeper(); |
1143 | 1164 | zookeeper->createAncestors(fs::path(queue_dir) / ""); |
1144 | 1165 | initializeReplication(); |
1145 | | - markReplicasActive(true); |
| 1166 | + markReplicasActive(/*reinitialized=*/true); |
1146 | 1167 | initialized = true; |
1147 | 1168 | return true; |
1148 | 1169 | } |
@@ -1197,23 +1218,26 @@ void DDLWorker::runMainThread() |
1197 | 1218 |
|
1198 | 1219 |
|
1199 | 1220 | DB::setThreadName(ThreadName::DDL_WORKER); |
1200 | | - LOG_DEBUG(log, "Starting DDLWorker thread"); |
1201 | | - |
| 1221 | + LOG_INFO(log, "Starting DDLWorker thread"); |
1202 | 1222 | while (!stop_flag) |
1203 | 1223 | { |
1204 | 1224 | try |
1205 | 1225 | { |
1206 | 1226 | bool reinitialized = !initialized; |
1207 | 1227 |
|
1208 | 1228 | /// Reinitialize DDLWorker state (including ZooKeeper connection) if required |
1209 | | - if (!initialized) |
| 1229 | + if (reinitialized) |
1210 | 1230 | { |
1211 | 1231 | /// Stopped |
1212 | 1232 | if (!initializeMainThread()) |
1213 | 1233 | break; |
| 1234 | + |
1214 | 1235 | LOG_DEBUG(log, "Initialized DDLWorker thread"); |
1215 | 1236 | } |
1216 | 1237 |
|
| 1238 | + if (host_ids_updated.exchange(false)) |
| 1239 | + markReplicasActive(/*reinitialized=*/false); |
| 1240 | + |
1217 | 1241 | cleanup_event->set(); |
1218 | 1242 | scheduleTasks(reinitialized); |
1219 | 1243 | subsequent_errors_count = 0; |
@@ -1272,61 +1296,104 @@ void DDLWorker::runMainThread() |
1272 | 1296 | void DDLWorker::initializeReplication() |
1273 | 1297 | { |
1274 | 1298 | auto zookeeper = getZooKeeper(); |
1275 | | - |
1276 | 1299 | zookeeper->createAncestors(fs::path(replicas_dir) / ""); |
1277 | | - |
1278 | | - NameSet host_id_set; |
1279 | | - for (const auto & it : context->getClusters()) |
1280 | | - { |
1281 | | - auto cluster = it.second; |
1282 | | - for (const auto & host_ids : cluster->getHostIDs()) |
1283 | | - for (const auto & host_id : host_ids) |
1284 | | - host_id_set.emplace(host_id); |
1285 | | - } |
1286 | | - |
1287 | | - createReplicaDirs(zookeeper, host_id_set); |
1288 | 1300 | } |
1289 | 1301 |
|
1290 | 1302 | void DDLWorker::createReplicaDirs(const ZooKeeperPtr & zookeeper, const NameSet & host_ids) |
1291 | 1303 | { |
1292 | 1304 | for (const auto & host_id : host_ids) |
| 1305 | + { |
| 1306 | + LOG_INFO(log, "Creating replica dir for host id {}", host_id); |
1293 | 1307 | zookeeper->createAncestors(fs::path(replicas_dir) / host_id / ""); |
| 1308 | + } |
1294 | 1309 | } |
1295 | 1310 |
|
1296 | | -void DDLWorker::markReplicasActive(bool /*reinitialized*/) |
| 1311 | +void DDLWorker::markReplicasActive(bool reinitialized) |
1297 | 1312 | { |
1298 | 1313 | auto zookeeper = getZooKeeper(); |
| 1314 | + const auto maybe_secure_port = context->getTCPPortSecure(); |
| 1315 | + const auto port = context->getTCPPort(); |
1299 | 1316 |
|
1300 | | - // Reset all active_node_holders |
1301 | | - for (auto & it : active_node_holders) |
| 1317 | + auto all_host_ids = getAllHostIDsFromClusters(); |
| 1318 | + |
| 1319 | + // Add interserver IO host IDs for Replicated DBs |
| 1320 | + try |
| 1321 | + { |
| 1322 | + auto host_port = context->getInterserverIOAddress(); |
| 1323 | + HostID interserver_io_host_id = {host_port.first, port}; |
| 1324 | + all_host_ids.emplace(interserver_io_host_id.toString()); |
| 1325 | + LOG_INFO(log, "Add interserver IO host ID {}", interserver_io_host_id.toString()); |
| 1326 | + if (maybe_secure_port) |
| 1327 | + { |
| 1328 | + HostID interserver_io_secure_host_id = {host_port.first, *maybe_secure_port}; |
| 1329 | + all_host_ids.emplace(interserver_io_secure_host_id.toString()); |
| 1330 | + LOG_INFO(log, "Add interserver IO secure host ID {}", interserver_io_secure_host_id.toString()); |
| 1331 | + } |
| 1332 | + } |
| 1333 | + catch (const Exception & e) |
1302 | 1334 | { |
1303 | | - auto & active_node_holder = it.second.second; |
1304 | | - if (active_node_holder) |
1305 | | - active_node_holder->setAlreadyRemoved(); |
1306 | | - active_node_holder.reset(); |
| 1335 | + LOG_INFO(log, "Unable to get interserver IO address, error {}", e.what()); |
1307 | 1336 | } |
1308 | 1337 |
|
1309 | | - active_node_holders.clear(); |
| 1338 | + createReplicaDirs(zookeeper, all_host_ids); |
| 1339 | + |
| 1340 | + if (reinitialized) |
| 1341 | + { |
| 1342 | + // Reset all active_node_holders |
| 1343 | + for (auto & it : active_node_holders) |
| 1344 | + { |
| 1345 | + auto & active_node_holder = it.second.second; |
| 1346 | + if (active_node_holder) |
| 1347 | + active_node_holder->setAlreadyRemoved(); |
| 1348 | + active_node_holder.reset(); |
| 1349 | + } |
| 1350 | + active_node_holders.clear(); |
| 1351 | + } |
1310 | 1352 |
|
1311 | | - const auto maybe_secure_port = context->getTCPPortSecure(); |
1312 | | - const auto port = context->getTCPPort(); |
1313 | 1353 |
|
1314 | 1354 | Coordination::Stat replicas_stat; |
1315 | 1355 | Strings host_ids = zookeeper->getChildren(replicas_dir, &replicas_stat); |
1316 | 1356 | NameSet local_host_ids; |
| 1357 | + NameSet checking_host_ids; |
| 1358 | + checking_host_ids.reserve(host_ids.size()); |
1317 | 1359 | for (const auto & host_id : host_ids) |
1318 | 1360 | { |
| 1361 | + bool is_self_host = false; |
1319 | 1362 | try |
1320 | 1363 | { |
1321 | 1364 | HostID host = HostID::fromString(host_id); |
1322 | | - if (DDLTask::isSelfHostID(log, host, maybe_secure_port, port)) |
1323 | | - local_host_ids.emplace(host_id); |
| 1365 | + checking_host_ids.insert(host.toString()); |
| 1366 | + |
| 1367 | + is_self_host = DDLTask::isSelfHostID(log, host, maybe_secure_port, port); |
1324 | 1368 | } |
1325 | 1369 | catch (const Exception & e) |
1326 | 1370 | { |
1327 | 1371 | LOG_WARNING(log, "Unable to check if host {} is a local address, exception: {}", host_id, e.displayText()); |
1328 | 1372 | continue; |
1329 | 1373 | } |
| 1374 | + |
| 1375 | + LOG_INFO(log, "Self host_id ({}) = {}", host_id, is_self_host); |
| 1376 | + if (is_self_host) |
| 1377 | + { |
| 1378 | + local_host_ids.emplace(host_id); |
| 1379 | + continue; |
| 1380 | + } |
| 1381 | + |
| 1382 | + if (!reinitialized) |
| 1383 | + { |
| 1384 | + /// Remove this host_id from active_node_holders |
| 1385 | + auto it = active_node_holders.find(host_id); |
| 1386 | + if (it != active_node_holders.end()) |
| 1387 | + { |
| 1388 | + auto & active_node_holder = it->second.second; |
| 1389 | + if (active_node_holder) |
| 1390 | + active_node_holder->setAlreadyRemoved(); |
| 1391 | + active_node_holder.reset(); |
| 1392 | + |
| 1393 | + active_node_holders.erase(it); |
| 1394 | + } |
| 1395 | + continue; |
| 1396 | + } |
1330 | 1397 | } |
1331 | 1398 |
|
1332 | 1399 | for (const auto & host_id : local_host_ids) |
@@ -1379,25 +1446,17 @@ void DDLWorker::markReplicasActive(bool /*reinitialized*/) |
1379 | 1446 | } |
1380 | 1447 | else |
1381 | 1448 | { |
1382 | | - LOG_DEBUG(log, "Marked a replica active: active_path={}, active_id={}", active_path, active_id); |
| 1449 | + LOG_INFO(log, "Marked a replica active: active_path={}, active_id={}", active_path, active_id); |
1383 | 1450 | } |
1384 | 1451 |
|
1385 | 1452 | auto active_node_holder_zookeeper = zookeeper; |
1386 | 1453 | auto active_node_holder = zkutil::EphemeralNodeHolder::existing(active_path, *active_node_holder_zookeeper); |
1387 | 1454 | active_node_holders[host_id] = {active_node_holder_zookeeper, active_node_holder}; |
1388 | 1455 | } |
1389 | 1456 |
|
1390 | | - if (active_node_holders.empty()) |
1391 | 1457 | { |
1392 | | - for (const auto & it : context->getClusters()) |
1393 | | - { |
1394 | | - const auto & cluster = it.second; |
1395 | | - if (!cluster->getHostIDs().empty()) |
1396 | | - { |
1397 | | - LOG_WARNING(log, "There are clusters with host ids but no local host found for this replica."); |
1398 | | - break; |
1399 | | - } |
1400 | | - } |
| 1458 | + std::lock_guard lock{checked_host_id_set_mutex}; |
| 1459 | + checked_host_id_set = checking_host_ids; |
1401 | 1460 | } |
1402 | 1461 | } |
1403 | 1462 |
|
@@ -1464,4 +1523,16 @@ void DDLWorker::runCleanupThread() |
1464 | 1523 | } |
1465 | 1524 | } |
1466 | 1525 |
|
| 1526 | +NameSet DDLWorker::getAllHostIDsFromClusters() const |
| 1527 | +{ |
| 1528 | + NameSet host_id_set; |
| 1529 | + for (const auto & it : context->getClusters()) |
| 1530 | + { |
| 1531 | + auto cluster = it.second; |
| 1532 | + for (const auto & host_ids : cluster->getHostIDs()) |
| 1533 | + for (const auto & host_id : host_ids) |
| 1534 | + host_id_set.emplace(host_id); |
| 1535 | + } |
| 1536 | + return host_id_set; |
| 1537 | +} |
1467 | 1538 | } |
0 commit comments