@@ -361,8 +361,9 @@ void StorageReplicatedMergeTree::waitMutationToFinishOnReplicas(
361361 else if (mutation_pointer_value >= mutation_id) // / Maybe we already processed more fresh mutation
362362 break ; // / (numbers like 0000000000 and 0000000001)
363363
364- // / We wait without timeout.
365- wait_event->wait ();
364+ // / Replica can become inactive, so wait with timeout and recheck it
365+ if (wait_event->tryWait (1000 ))
366+ break ;
366367 }
367368
368369 if (partial_shutdown_called)
@@ -3841,7 +3842,8 @@ Strings StorageReplicatedMergeTree::waitForAllReplicasToProcessLogEntry(const Re
38413842 {
38423843 if (wait_for_non_active || zookeeper->exists (zookeeper_path + " /replicas/" + replica + " /is_active" ))
38433844 {
3844- waitForReplicaToProcessLogEntry (replica, entry);
3845+ if (!waitForReplicaToProcessLogEntry (replica, entry, wait_for_non_active))
3846+ unwaited.push_back (replica);
38453847 }
38463848 else
38473849 {
@@ -3854,7 +3856,7 @@ Strings StorageReplicatedMergeTree::waitForAllReplicasToProcessLogEntry(const Re
38543856}
38553857
38563858
3857- void StorageReplicatedMergeTree::waitForReplicaToProcessLogEntry (const String & replica, const ReplicatedMergeTreeLogEntryData & entry)
3859+ bool StorageReplicatedMergeTree::waitForReplicaToProcessLogEntry (const String & replica, const ReplicatedMergeTreeLogEntryData & entry, bool wait_for_non_active )
38583860{
38593861 String entry_str = entry.toString ();
38603862 String log_node_name;
@@ -3875,6 +3877,12 @@ void StorageReplicatedMergeTree::waitForReplicaToProcessLogEntry(const String &
38753877 * To do this, check its node `log_pointer` - the maximum number of the element taken from `log` + 1.
38763878 */
38773879
3880+ const auto & check_replica_become_inactive = [this , &replica]()
3881+ {
3882+ return !getZooKeeper ()->exists (zookeeper_path + " /replicas/" + replica + " /is_active" );
3883+ };
3884+ constexpr auto event_wait_timeout_ms = 1000 ;
3885+
38783886 if (startsWith (entry.znode_name , " log-" ))
38793887 {
38803888 /* * In this case, just take the number from the node name `log-xxxxxxxxxx`.
@@ -3886,15 +3894,18 @@ void StorageReplicatedMergeTree::waitForReplicaToProcessLogEntry(const String &
38863894 LOG_DEBUG (log, " Waiting for " << replica << " to pull " << log_node_name << " to queue" );
38873895
38883896 // / Let's wait until entry gets into the replica queue.
3889- while (true )
3897+ while (wait_for_non_active || ! check_replica_become_inactive () )
38903898 {
38913899 zkutil::EventPtr event = std::make_shared<Poco::Event>();
38923900
38933901 String log_pointer = getZooKeeper ()->get (zookeeper_path + " /replicas/" + replica + " /log_pointer" , nullptr , event);
38943902 if (!log_pointer.empty () && parse<UInt64>(log_pointer) > log_index)
38953903 break ;
38963904
3897- event->wait ();
3905+ if (wait_for_non_active)
3906+ event->wait ();
3907+ else
3908+ event->tryWait (event_wait_timeout_ms);
38983909 }
38993910 }
39003911 else if (startsWith (entry.znode_name , " queue-" ))
@@ -3931,15 +3942,18 @@ void StorageReplicatedMergeTree::waitForReplicaToProcessLogEntry(const String &
39313942 LOG_DEBUG (log, " Waiting for " << replica << " to pull " << log_node_name << " to queue" );
39323943
39333944 // / Let's wait until the entry gets into the replica queue.
3934- while (true )
3945+ while (wait_for_non_active || ! check_replica_become_inactive () )
39353946 {
39363947 zkutil::EventPtr event = std::make_shared<Poco::Event>();
39373948
39383949 String log_pointer_new = getZooKeeper ()->get (zookeeper_path + " /replicas/" + replica + " /log_pointer" , nullptr , event);
39393950 if (!log_pointer_new.empty () && parse<UInt64>(log_pointer_new) > log_index)
39403951 break ;
39413952
3942- event->wait ();
3953+ if (wait_for_non_active)
3954+ event->wait ();
3955+ else
3956+ event->tryWait (event_wait_timeout_ms);
39433957 }
39443958 }
39453959 }
@@ -3974,13 +3988,17 @@ void StorageReplicatedMergeTree::waitForReplicaToProcessLogEntry(const String &
39743988 if (queue_entry_to_wait_for.empty ())
39753989 {
39763990 LOG_DEBUG (log, " No corresponding node found. Assuming it has been already processed." " Found " << queue_entries.size () << " nodes." );
3977- return ;
3991+ return true ;
39783992 }
39793993
39803994 LOG_DEBUG (log, " Waiting for " << queue_entry_to_wait_for << " to disappear from " << replica << " queue" );
39813995
3982- // / Third - wait until the entry disappears from the replica queue.
3983- getZooKeeper ()->waitForDisappear (zookeeper_path + " /replicas/" + replica + " /queue/" + queue_entry_to_wait_for);
3996+ // / Third - wait until the entry disappears from the replica queue or replica become inactive.
3997+ String path_to_wait_on = zookeeper_path + " /replicas/" + replica + " /queue/" + queue_entry_to_wait_for;
3998+ if (wait_for_non_active)
3999+ return getZooKeeper ()->waitForDisappear (path_to_wait_on);
4000+
4001+ return getZooKeeper ()->waitForDisappear (path_to_wait_on, check_replica_become_inactive);
39844002}
39854003
39864004
0 commit comments