Skip to content

Commit f7cd4d4

Browse files
Backport #78541 to 25.3: s3queue: fix logical error "Cannot unregister: table uuid is not registered"
1 parent b2e25c8 commit f7cd4d4

5 files changed

Lines changed: 156 additions & 74 deletions

File tree

src/Storages/ObjectStorageQueue/ObjectStorageQueueMetadata.cpp

Lines changed: 128 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -577,13 +577,18 @@ void ObjectStorageQueueMetadata::registerNonActive(const StorageID & storage_id)
577577
{
578578
const auto registry_path = zookeeper_path / "registry";
579579
const auto self = Info::create(storage_id);
580+
const auto drop_lock_path = zookeeper_path / "drop";
580581

581582
Coordination::Error code;
582583
for (size_t i = 0; i < 1000; ++i)
583584
{
584585
Coordination::Stat stat;
585586
std::string registry_str;
586587
auto zk_client = getZooKeeper();
588+
bool supports_remove_recursive = zk_client->isFeatureEnabled(DB::KeeperFeatureFlag::REMOVE_RECURSIVE);
589+
590+
Coordination::Requests requests;
591+
Coordination::Responses responses;
587592

588593
if (zk_client->tryGet(registry_path, registry_str, &stat))
589594
{
@@ -604,14 +609,20 @@ void ObjectStorageQueueMetadata::registerNonActive(const StorageID & storage_id)
604609
}
605610

606611
auto new_registry_str = registry_str + "," + self.serialize();
607-
code = zk_client->trySet(registry_path, new_registry_str, stat.version);
612+
requests.push_back(zkutil::makeSetRequest(registry_path, new_registry_str, stat.version));
608613
}
609614
else
610-
code = zk_client->tryCreate(
615+
{
616+
requests.push_back(zkutil::makeCreateRequest(
611617
registry_path,
612618
self.serialize(),
613-
zkutil::CreateMode::Persistent);
619+
zkutil::CreateMode::Persistent));
614620

621+
if (!supports_remove_recursive)
622+
zkutil::addCheckNotExistsRequest(requests, *zk_client, drop_lock_path);
623+
}
624+
625+
code = zk_client->tryMulti(requests, responses);
615626
if (code == Coordination::Error::ZOK)
616627
{
617628
LOG_TRACE(log, "Added {} to registry", self.table_id);
@@ -648,12 +659,12 @@ Strings ObjectStorageQueueMetadata::getRegistered(bool active)
648659
return registered;
649660
}
650661

651-
size_t ObjectStorageQueueMetadata::unregister(const StorageID & storage_id, bool active)
662+
size_t ObjectStorageQueueMetadata::unregister(const StorageID & storage_id, bool active, bool remove_metadata_if_no_registered)
652663
{
653664
if (active)
654665
return unregisterActive(storage_id);
655666
else
656-
return unregisterNonActive(storage_id);
667+
return unregisterNonActive(storage_id, remove_metadata_if_no_registered);
657668
}
658669

659670
size_t ObjectStorageQueueMetadata::unregisterActive(const StorageID & storage_id)
@@ -683,63 +694,145 @@ size_t ObjectStorageQueueMetadata::unregisterActive(const StorageID & storage_id
683694
return remaining_nodes_num;
684695
}
685696

686-
size_t ObjectStorageQueueMetadata::unregisterNonActive(const StorageID & storage_id)
697+
size_t ObjectStorageQueueMetadata::unregisterNonActive(const StorageID & storage_id, bool remove_metadata_if_no_registered)
687698
{
688699
const auto registry_path = zookeeper_path / "registry";
700+
const auto drop_lock_path = zookeeper_path / "drop";
689701
const auto self = Info::create(storage_id);
690702

691703
Coordination::Error code = Coordination::Error::ZOK;
704+
692705
for (size_t i = 0; i < 1000; ++i)
693706
{
694-
Coordination::Stat stat;
695-
std::string registry_str;
696-
auto zk_client = getZooKeeper();
707+
Coordination::Requests requests;
708+
Coordination::Responses responses;
709+
size_t count = 0;
710+
711+
bool supports_remove_recursive = true;
712+
zkutil::ZooKeeperPtr zk_client;
697713

698-
bool node_exists = zk_client->tryGet(registry_path, registry_str, &stat);
699-
if (!node_exists)
714+
try
700715
{
701-
LOG_WARNING(log, "Cannot unregister: registry does not exist");
702-
chassert(false);
703-
return 0;
704-
}
716+
zk_client = getZooKeeper();
717+
supports_remove_recursive = zk_client->isFeatureEnabled(DB::KeeperFeatureFlag::REMOVE_RECURSIVE);
705718

706-
Strings registered;
707-
splitInto<','>(registered, registry_str);
719+
Coordination::Stat stat;
720+
std::string registry_str;
721+
bool node_exists = zk_client->tryGet(registry_path, registry_str, &stat);
722+
if (!node_exists)
723+
{
724+
LOG_WARNING(log, "Cannot unregister: registry does not exist");
725+
chassert(false);
726+
return 0;
727+
}
708728

709-
bool found = false;
710-
std::string new_registry_str;
711-
size_t count = 0;
712-
for (const auto & elem : registered)
713-
{
714-
if (elem.empty())
715-
continue;
729+
Strings registered;
730+
splitInto<','>(registered, registry_str);
731+
732+
bool found = false;
733+
std::string new_registry_str;
734+
for (const auto & elem : registered)
735+
{
736+
if (elem.empty())
737+
continue;
738+
739+
auto info = Info::deserialize(elem);
740+
if (info == self)
741+
found = true;
742+
else
743+
{
744+
if (!new_registry_str.empty())
745+
new_registry_str += ",";
746+
new_registry_str += elem;
747+
count += 1;
748+
}
749+
}
750+
if (!found)
751+
throw Exception(ErrorCodes::LOGICAL_ERROR, "Cannot unregister: table '{}' is not registered", self.table_id);
716752

717-
auto info = Info::deserialize(elem);
718-
if (info == self)
719-
found = true;
753+
if (remove_metadata_if_no_registered && count == 0)
754+
{
755+
LOG_TRACE(log, "Removing all metadata in keeper by path: {}", zookeeper_path.string());
756+
if (supports_remove_recursive)
757+
{
758+
requests.push_back(zkutil::makeCheckRequest(registry_path, stat.version));
759+
requests.push_back(zkutil::makeRemoveRecursiveRequest(zookeeper_path, std::numeric_limits<uint32_t>::max()));
760+
}
761+
else
762+
{
763+
requests.push_back(zkutil::makeCheckRequest(registry_path, stat.version));
764+
requests.push_back(zkutil::makeCreateRequest(drop_lock_path, "", zkutil::CreateMode::Ephemeral));
765+
}
766+
code = zk_client->tryMulti(requests, responses);
767+
}
720768
else
721769
{
722-
if (!new_registry_str.empty())
723-
new_registry_str += ",";
724-
new_registry_str += elem;
725-
count += 1;
770+
code = zk_client->trySet(registry_path, new_registry_str, stat.version);
726771
}
727772
}
728-
if (!found)
729-
throw Exception(ErrorCodes::LOGICAL_ERROR, "Cannot unregister: table '{}' is not registered", self.table_id);
730-
731-
code = zk_client->trySet(registry_path, new_registry_str, stat.version);
773+
catch (const zkutil::KeeperMultiException & e)
774+
{
775+
if (Coordination::isHardwareError(e.code))
776+
{
777+
LOG_TEST(log, "Lost connection to zookeeper, will retry");
778+
continue;
779+
}
780+
throw;
781+
}
782+
catch (const zkutil::KeeperException & e)
783+
{
784+
if (Coordination::isHardwareError(e.code))
785+
{
786+
LOG_TEST(log, "Lost connection to zookeeper, will retry");
787+
continue;
788+
}
789+
throw;
790+
}
732791

733792
if (code == Coordination::Error::ZOK)
734793
{
735794
LOG_TRACE(log, "Table '{}' has been removed from the registry", self.table_id);
795+
796+
if (!supports_remove_recursive && remove_metadata_if_no_registered && count == 0)
797+
{
798+
/// Take a drop lock and do recursive remove as a separate request.
799+
/// In case of unsupported "remove_recursive" feature, it will
800+
/// do getChildren and remove them one by one.
801+
auto drop_lock = zkutil::EphemeralNodeHolder::existing(drop_lock_path, *zk_client);
802+
try
803+
{
804+
zk_client->removeRecursive(zookeeper_path);
805+
}
806+
catch (const zkutil::KeeperMultiException & e)
807+
{
808+
if (Coordination::isHardwareError(e.code))
809+
{
810+
LOG_TEST(log, "Lost connection to zookeeper, will retry");
811+
continue;
812+
}
813+
throw;
814+
}
815+
catch (const zkutil::KeeperException & e)
816+
{
817+
if (Coordination::isHardwareError(e.code))
818+
{
819+
LOG_TEST(log, "Lost connection to zookeeper, will retry");
820+
continue;
821+
}
822+
throw;
823+
}
824+
}
736825
return count;
737826
}
738827

739828
if (Coordination::isHardwareError(code)
740829
|| code == Coordination::Error::ZBADVERSION)
741830
continue;
742831

832+
if (!responses.empty())
833+
{
834+
zkutil::KeeperMultiException::check(code, requests, responses);
835+
}
743836
throw zkutil::KeeperException(code);
744837
}
745838

src/Storages/ObjectStorageQueue/ObjectStorageQueueMetadata.h

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -125,7 +125,7 @@ class ObjectStorageQueueMetadata
125125
void registerIfNot(const StorageID & storage_id, bool active);
126126
/// Unregister table.
127127
/// Return the number of remaining (after unregistering) registered tables.
128-
size_t unregister(const StorageID & storage_id, bool active);
128+
size_t unregister(const StorageID & storage_id, bool active, bool remove_metadata_if_no_registered);
129129
Strings getRegistered(bool active);
130130

131131
/// According to current *active* registered tables,
@@ -155,7 +155,7 @@ class ObjectStorageQueueMetadata
155155
void registerNonActive(const StorageID & storage_id);
156156
void registerActive(const StorageID & storage_id);
157157

158-
size_t unregisterNonActive(const StorageID & storage_id);
158+
size_t unregisterNonActive(const StorageID & storage_id, bool remove_metadata_if_no_registered);
159159
size_t unregisterActive(const StorageID & storage_id);
160160

161161
void updateRegistryFunc();

src/Storages/ObjectStorageQueue/ObjectStorageQueueMetadataFactory.cpp

Lines changed: 5 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -49,37 +49,18 @@ void ObjectStorageQueueMetadataFactory::remove(const std::string & zookeeper_pat
4949

5050
*it->second.ref_count -= 1;
5151

52-
size_t registry_size;
5352
try
5453
{
55-
registry_size = it->second.metadata->unregister(storage_id, false);
54+
const auto registry_size = it->second.metadata->unregister(
55+
storage_id,
56+
/* active */false,
57+
/* remove_all_metadata_if_no_registered */true);
58+
5659
LOG_TRACE(log, "Remaining registry size: {}", registry_size);
5760
}
58-
catch (const zkutil::KeeperException & e)
59-
{
60-
if (!Coordination::isHardwareError(e.code))
61-
{
62-
tryLogCurrentException(__PRETTY_FUNCTION__);
63-
}
64-
/// Any non-zero value would do.
65-
registry_size = 1;
66-
}
6761
catch (...)
6862
{
6963
tryLogCurrentException(__PRETTY_FUNCTION__);
70-
/// Any non-zero value would do.
71-
registry_size = 1;
72-
}
73-
74-
if (registry_size == 0)
75-
{
76-
auto zk_client = Context::getGlobalContextInstance()->getZooKeeper();
77-
auto code = zk_client->tryRemoveRecursive(it->first);
78-
if (code != Coordination::Error::ZOK
79-
&& !Coordination::isHardwareError(code))
80-
{
81-
LOG_ERROR(log, "Unexpected error while removing metadata: {}, path: {}", code, it->first);
82-
}
8364
}
8465

8566
if (!it->second.ref_count)

src/Storages/ObjectStorageQueue/StorageObjectStorageQueue.cpp

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -292,7 +292,7 @@ void StorageObjectStorageQueue::shutdown(bool is_drop)
292292
{
293293
try
294294
{
295-
files_metadata->unregister(getStorageID(), /* active */true);
295+
files_metadata->unregister(getStorageID(), /* active */true, /* remove_metadata_if_no_registered */false);
296296
}
297297
catch (...)
298298
{
@@ -543,7 +543,7 @@ void StorageObjectStorageQueue::threadFunc()
543543
{
544544
try
545545
{
546-
files_metadata->unregister(storage_id, /* active */true);
546+
files_metadata->unregister(storage_id, /* active */true, /* remove_metadata_if_no_registered */false);
547547
}
548548
catch (...)
549549
{

tests/integration/test_storage_s3_queue/test_4.py

Lines changed: 19 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -79,17 +79,23 @@ def test_replicated(started_cluster):
7979
f"CREATE DATABASE {db_name} ENGINE=Replicated('/clickhouse/databases/replicateddb', 'shard1', 'node2')"
8080
)
8181

82-
create_table(
83-
started_cluster,
84-
node1,
85-
table_name,
86-
"ordered",
87-
files_path,
88-
additional_settings={
89-
"keeper_path": keeper_path,
90-
},
91-
database_name="r",
92-
)
82+
def do_create_table():
83+
create_table(
84+
started_cluster,
85+
node1,
86+
table_name,
87+
"ordered",
88+
files_path,
89+
additional_settings={
90+
"processing_threads_num": 16,
91+
"keeper_path": keeper_path,
92+
},
93+
database_name="r",
94+
)
95+
96+
do_create_table()
97+
node1.query(f"DROP TABLE r.{table_name} SYNC")
98+
do_create_table()
9399

94100
assert '"processing_threads_num":16' in node1.query(
95101
f"SELECT * FROM system.zookeeper WHERE path = '{keeper_path}'"
@@ -115,6 +121,8 @@ def get_count():
115121
time.sleep(1)
116122
assert expected_rows == get_count()
117123

124+
node1.query(f"DROP TABLE {db_name}.{table_name} SYNC")
125+
118126

119127
def test_bad_settings(started_cluster):
120128
node = started_cluster.instances["node_cloud_mode"]

0 commit comments

Comments
 (0)