@@ -577,13 +577,18 @@ void ObjectStorageQueueMetadata::registerNonActive(const StorageID & storage_id)
577577{
578578 const auto registry_path = zookeeper_path / " registry" ;
579579 const auto self = Info::create (storage_id);
580+ const auto drop_lock_path = zookeeper_path / " drop" ;
580581
581582 Coordination::Error code;
582583 for (size_t i = 0 ; i < 1000 ; ++i)
583584 {
584585 Coordination::Stat stat;
585586 std::string registry_str;
586587 auto zk_client = getZooKeeper ();
588+ bool supports_remove_recursive = zk_client->isFeatureEnabled (DB::KeeperFeatureFlag::REMOVE_RECURSIVE);
589+
590+ Coordination::Requests requests;
591+ Coordination::Responses responses;
587592
588593 if (zk_client->tryGet (registry_path, registry_str, &stat))
589594 {
@@ -604,14 +609,20 @@ void ObjectStorageQueueMetadata::registerNonActive(const StorageID & storage_id)
604609 }
605610
606611 auto new_registry_str = registry_str + " ," + self.serialize ();
607- code = zk_client-> trySet ( registry_path, new_registry_str, stat.version );
612+ requests. push_back ( zkutil::makeSetRequest ( registry_path, new_registry_str, stat.version ) );
608613 }
609614 else
610- code = zk_client->tryCreate (
615+ {
616+ requests.push_back (zkutil::makeCreateRequest (
611617 registry_path,
612618 self.serialize (),
613- zkutil::CreateMode::Persistent);
619+ zkutil::CreateMode::Persistent)) ;
614620
621+ if (!supports_remove_recursive)
622+ zkutil::addCheckNotExistsRequest (requests, *zk_client, drop_lock_path);
623+ }
624+
625+ code = zk_client->tryMulti (requests, responses);
615626 if (code == Coordination::Error::ZOK)
616627 {
617628 LOG_TRACE (log, " Added {} to registry" , self.table_id );
@@ -648,12 +659,12 @@ Strings ObjectStorageQueueMetadata::getRegistered(bool active)
648659 return registered;
649660}
650661
651- size_t ObjectStorageQueueMetadata::unregister (const StorageID & storage_id, bool active)
662+ size_t ObjectStorageQueueMetadata::unregister (const StorageID & storage_id, bool active, bool remove_metadata_if_no_registered )
652663{
653664 if (active)
654665 return unregisterActive (storage_id);
655666 else
656- return unregisterNonActive (storage_id);
667+ return unregisterNonActive (storage_id, remove_metadata_if_no_registered );
657668}
658669
659670size_t ObjectStorageQueueMetadata::unregisterActive (const StorageID & storage_id)
@@ -683,63 +694,145 @@ size_t ObjectStorageQueueMetadata::unregisterActive(const StorageID & storage_id
683694 return remaining_nodes_num;
684695}
685696
686- size_t ObjectStorageQueueMetadata::unregisterNonActive (const StorageID & storage_id)
697+ size_t ObjectStorageQueueMetadata::unregisterNonActive (const StorageID & storage_id, bool remove_metadata_if_no_registered )
687698{
688699 const auto registry_path = zookeeper_path / " registry" ;
700+ const auto drop_lock_path = zookeeper_path / " drop" ;
689701 const auto self = Info::create (storage_id);
690702
691703 Coordination::Error code = Coordination::Error::ZOK;
704+
692705 for (size_t i = 0 ; i < 1000 ; ++i)
693706 {
694- Coordination::Stat stat;
695- std::string registry_str;
696- auto zk_client = getZooKeeper ();
707+ Coordination::Requests requests;
708+ Coordination::Responses responses;
709+ size_t count = 0 ;
710+
711+ bool supports_remove_recursive = true ;
712+ zkutil::ZooKeeperPtr zk_client;
697713
698- bool node_exists = zk_client->tryGet (registry_path, registry_str, &stat);
699- if (!node_exists)
714+ try
700715 {
701- LOG_WARNING (log, " Cannot unregister: registry does not exist" );
702- chassert (false );
703- return 0 ;
704- }
716+ zk_client = getZooKeeper ();
717+ supports_remove_recursive = zk_client->isFeatureEnabled (DB::KeeperFeatureFlag::REMOVE_RECURSIVE);
705718
706- Strings registered;
707- splitInto<' ,' >(registered, registry_str);
719+ Coordination::Stat stat;
720+ std::string registry_str;
721+ bool node_exists = zk_client->tryGet (registry_path, registry_str, &stat);
722+ if (!node_exists)
723+ {
724+ LOG_WARNING (log, " Cannot unregister: registry does not exist" );
725+ chassert (false );
726+ return 0 ;
727+ }
708728
709- bool found = false ;
710- std::string new_registry_str;
711- size_t count = 0 ;
712- for (const auto & elem : registered)
713- {
714- if (elem.empty ())
715- continue ;
729+ Strings registered;
730+ splitInto<' ,' >(registered, registry_str);
731+
732+ bool found = false ;
733+ std::string new_registry_str;
734+ for (const auto & elem : registered)
735+ {
736+ if (elem.empty ())
737+ continue ;
738+
739+ auto info = Info::deserialize (elem);
740+ if (info == self)
741+ found = true ;
742+ else
743+ {
744+ if (!new_registry_str.empty ())
745+ new_registry_str += " ," ;
746+ new_registry_str += elem;
747+ count += 1 ;
748+ }
749+ }
750+ if (!found)
751+ throw Exception (ErrorCodes::LOGICAL_ERROR, " Cannot unregister: table '{}' is not registered" , self.table_id );
716752
717- auto info = Info::deserialize (elem);
718- if (info == self)
719- found = true ;
753+ if (remove_metadata_if_no_registered && count == 0 )
754+ {
755+ LOG_TRACE (log, " Removing all metadata in keeper by path: {}" , zookeeper_path.string ());
756+ if (supports_remove_recursive)
757+ {
758+ requests.push_back (zkutil::makeCheckRequest (registry_path, stat.version ));
759+ requests.push_back (zkutil::makeRemoveRecursiveRequest (zookeeper_path, std::numeric_limits<uint32_t >::max ()));
760+ }
761+ else
762+ {
763+ requests.push_back (zkutil::makeCheckRequest (registry_path, stat.version ));
764+ requests.push_back (zkutil::makeCreateRequest (drop_lock_path, " " , zkutil::CreateMode::Ephemeral));
765+ }
766+ code = zk_client->tryMulti (requests, responses);
767+ }
720768 else
721769 {
722- if (!new_registry_str.empty ())
723- new_registry_str += " ," ;
724- new_registry_str += elem;
725- count += 1 ;
770+ code = zk_client->trySet (registry_path, new_registry_str, stat.version );
726771 }
727772 }
728- if (!found)
729- throw Exception (ErrorCodes::LOGICAL_ERROR, " Cannot unregister: table '{}' is not registered" , self.table_id );
730-
731- code = zk_client->trySet (registry_path, new_registry_str, stat.version );
773+ catch (const zkutil::KeeperMultiException & e)
774+ {
775+ if (Coordination::isHardwareError (e.code ))
776+ {
777+ LOG_TEST (log, " Lost connection to zookeeper, will retry" );
778+ continue ;
779+ }
780+ throw ;
781+ }
782+ catch (const zkutil::KeeperException & e)
783+ {
784+ if (Coordination::isHardwareError (e.code ))
785+ {
786+ LOG_TEST (log, " Lost connection to zookeeper, will retry" );
787+ continue ;
788+ }
789+ throw ;
790+ }
732791
733792 if (code == Coordination::Error::ZOK)
734793 {
735794 LOG_TRACE (log, " Table '{}' has been removed from the registry" , self.table_id );
795+
796+ if (!supports_remove_recursive && remove_metadata_if_no_registered && count == 0 )
797+ {
798+ // / Take a drop lock and do recursive remove as a separate request.
799+ // / In case of unsupported "remove_recursive" feature, it will
800+ // / do getChildren and remove them one by one.
801+ auto drop_lock = zkutil::EphemeralNodeHolder::existing (drop_lock_path, *zk_client);
802+ try
803+ {
804+ zk_client->removeRecursive (zookeeper_path);
805+ }
806+ catch (const zkutil::KeeperMultiException & e)
807+ {
808+ if (Coordination::isHardwareError (e.code ))
809+ {
810+ LOG_TEST (log, " Lost connection to zookeeper, will retry" );
811+ continue ;
812+ }
813+ throw ;
814+ }
815+ catch (const zkutil::KeeperException & e)
816+ {
817+ if (Coordination::isHardwareError (e.code ))
818+ {
819+ LOG_TEST (log, " Lost connection to zookeeper, will retry" );
820+ continue ;
821+ }
822+ throw ;
823+ }
824+ }
736825 return count;
737826 }
738827
739828 if (Coordination::isHardwareError (code)
740829 || code == Coordination::Error::ZBADVERSION)
741830 continue ;
742831
832+ if (!responses.empty ())
833+ {
834+ zkutil::KeeperMultiException::check (code, requests, responses);
835+ }
743836 throw zkutil::KeeperException (code);
744837 }
745838
0 commit comments