Skip to content

Commit 623b2e5

Browse files
Merge pull request #10849 from ClickHouse/fix_optimize_and_alter_hangs
Fix mutations and OPTIMIZE hangs when replica becomes inactive
2 parents 496d90b + 63c6eb1 commit 623b2e5

5 files changed

Lines changed: 59 additions & 47 deletions

File tree

src/Common/ZooKeeper/ZooKeeper.cpp

Lines changed: 25 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -629,51 +629,54 @@ namespace
629629
{
630630
struct WaitForDisappearState
631631
{
632-
int32_t code = 0;
633-
int32_t event_type = 0;
632+
std::atomic_int32_t code = 0;
633+
std::atomic_int32_t event_type = 0;
634634
Poco::Event event;
635635
};
636636
using WaitForDisappearStatePtr = std::shared_ptr<WaitForDisappearState>;
637637
}
638638

639-
void ZooKeeper::waitForDisappear(const std::string & path)
639+
bool ZooKeeper::waitForDisappear(const std::string & path, const WaitCondition & condition)
640640
{
641641
WaitForDisappearStatePtr state = std::make_shared<WaitForDisappearState>();
642642

643-
while (true)
643+
auto callback = [state](const Coordination::ExistsResponse & response)
644644
{
645-
auto callback = [state](const Coordination::ExistsResponse & response)
646-
{
647-
state->code = response.error;
648-
if (state->code)
649-
state->event.set();
650-
};
645+
state->code = response.error;
646+
if (state->code)
647+
state->event.set();
648+
};
651649

652-
auto watch = [state](const Coordination::WatchResponse & response)
650+
auto watch = [state](const Coordination::WatchResponse & response)
651+
{
652+
if (!state->code)
653653
{
654+
state->code = response.error;
654655
if (!state->code)
655-
{
656-
state->code = response.error;
657-
if (!state->code)
658-
state->event_type = response.type;
659-
state->event.set();
660-
}
661-
};
656+
state->event_type = response.type;
657+
state->event.set();
658+
}
659+
};
662660

661+
while (!condition || !condition())
662+
{
663663
/// NOTE: if the node doesn't exist, the watch will leak.
664-
665664
impl->exists(path, callback, watch);
666-
state->event.wait();
665+
if (!condition)
666+
state->event.wait();
667+
else if (!state->event.tryWait(1000))
668+
continue;
667669

668670
if (state->code == Coordination::ZNONODE)
669-
return;
671+
return true;
670672

671673
if (state->code)
672674
throw KeeperException(state->code, path);
673675

674676
if (state->event_type == Coordination::DELETED)
675-
return;
677+
return true;
676678
}
679+
return false;
677680
}
678681

679682
ZooKeeperPtr ZooKeeper::startNewSession() const

src/Common/ZooKeeper/ZooKeeper.h

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -185,8 +185,11 @@ class ZooKeeper
185185
/// Remove all children nodes (non recursive).
186186
void removeChildren(const std::string & path);
187187

188+
using WaitCondition = std::function<bool()>;
188189
/// Wait for the node to disappear or return immediately if it doesn't exist.
189-
void waitForDisappear(const std::string & path);
190+
/// If condition is speficied, it is used to return early (when condition returns false)
191+
/// The function returns true if waited and false if waiting was interrupted by condition.
192+
bool waitForDisappear(const std::string & path, const WaitCondition & condition = {});
190193

191194
/// Async interface (a small subset of operations is implemented).
192195
///

src/Storages/StorageReplicatedMergeTree.cpp

Lines changed: 29 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -361,8 +361,9 @@ void StorageReplicatedMergeTree::waitMutationToFinishOnReplicas(
361361
else if (mutation_pointer_value >= mutation_id) /// Maybe we already processed more fresh mutation
362362
break; /// (numbers like 0000000000 and 0000000001)
363363

364-
/// We wait without timeout.
365-
wait_event->wait();
364+
/// Replica can become inactive, so wait with timeout and recheck it
365+
if (wait_event->tryWait(1000))
366+
break;
366367
}
367368

368369
if (partial_shutdown_called)
@@ -3841,7 +3842,8 @@ Strings StorageReplicatedMergeTree::waitForAllReplicasToProcessLogEntry(const Re
38413842
{
38423843
if (wait_for_non_active || zookeeper->exists(zookeeper_path + "/replicas/" + replica + "/is_active"))
38433844
{
3844-
waitForReplicaToProcessLogEntry(replica, entry);
3845+
if (!waitForReplicaToProcessLogEntry(replica, entry, wait_for_non_active))
3846+
unwaited.push_back(replica);
38453847
}
38463848
else
38473849
{
@@ -3854,7 +3856,7 @@ Strings StorageReplicatedMergeTree::waitForAllReplicasToProcessLogEntry(const Re
38543856
}
38553857

38563858

3857-
void StorageReplicatedMergeTree::waitForReplicaToProcessLogEntry(const String & replica, const ReplicatedMergeTreeLogEntryData & entry)
3859+
bool StorageReplicatedMergeTree::waitForReplicaToProcessLogEntry(const String & replica, const ReplicatedMergeTreeLogEntryData & entry, bool wait_for_non_active)
38583860
{
38593861
String entry_str = entry.toString();
38603862
String log_node_name;
@@ -3875,6 +3877,12 @@ void StorageReplicatedMergeTree::waitForReplicaToProcessLogEntry(const String &
38753877
* To do this, check its node `log_pointer` - the maximum number of the element taken from `log` + 1.
38763878
*/
38773879

3880+
const auto & check_replica_become_inactive = [this, &replica]()
3881+
{
3882+
return !getZooKeeper()->exists(zookeeper_path + "/replicas/" + replica + "/is_active");
3883+
};
3884+
constexpr auto event_wait_timeout_ms = 1000;
3885+
38783886
if (startsWith(entry.znode_name, "log-"))
38793887
{
38803888
/** In this case, just take the number from the node name `log-xxxxxxxxxx`.
@@ -3886,15 +3894,18 @@ void StorageReplicatedMergeTree::waitForReplicaToProcessLogEntry(const String &
38863894
LOG_DEBUG(log, "Waiting for " << replica << " to pull " << log_node_name << " to queue");
38873895

38883896
/// Let's wait until entry gets into the replica queue.
3889-
while (true)
3897+
while (wait_for_non_active || !check_replica_become_inactive())
38903898
{
38913899
zkutil::EventPtr event = std::make_shared<Poco::Event>();
38923900

38933901
String log_pointer = getZooKeeper()->get(zookeeper_path + "/replicas/" + replica + "/log_pointer", nullptr, event);
38943902
if (!log_pointer.empty() && parse<UInt64>(log_pointer) > log_index)
38953903
break;
38963904

3897-
event->wait();
3905+
if (wait_for_non_active)
3906+
event->wait();
3907+
else
3908+
event->tryWait(event_wait_timeout_ms);
38983909
}
38993910
}
39003911
else if (startsWith(entry.znode_name, "queue-"))
@@ -3931,15 +3942,18 @@ void StorageReplicatedMergeTree::waitForReplicaToProcessLogEntry(const String &
39313942
LOG_DEBUG(log, "Waiting for " << replica << " to pull " << log_node_name << " to queue");
39323943

39333944
/// Let's wait until the entry gets into the replica queue.
3934-
while (true)
3945+
while (wait_for_non_active || !check_replica_become_inactive())
39353946
{
39363947
zkutil::EventPtr event = std::make_shared<Poco::Event>();
39373948

39383949
String log_pointer_new = getZooKeeper()->get(zookeeper_path + "/replicas/" + replica + "/log_pointer", nullptr, event);
39393950
if (!log_pointer_new.empty() && parse<UInt64>(log_pointer_new) > log_index)
39403951
break;
39413952

3942-
event->wait();
3953+
if (wait_for_non_active)
3954+
event->wait();
3955+
else
3956+
event->tryWait(event_wait_timeout_ms);
39433957
}
39443958
}
39453959
}
@@ -3974,13 +3988,17 @@ void StorageReplicatedMergeTree::waitForReplicaToProcessLogEntry(const String &
39743988
if (queue_entry_to_wait_for.empty())
39753989
{
39763990
LOG_DEBUG(log, "No corresponding node found. Assuming it has been already processed." " Found " << queue_entries.size() << " nodes.");
3977-
return;
3991+
return true;
39783992
}
39793993

39803994
LOG_DEBUG(log, "Waiting for " << queue_entry_to_wait_for << " to disappear from " << replica << " queue");
39813995

3982-
/// Third - wait until the entry disappears from the replica queue.
3983-
getZooKeeper()->waitForDisappear(zookeeper_path + "/replicas/" + replica + "/queue/" + queue_entry_to_wait_for);
3996+
/// Third - wait until the entry disappears from the replica queue or replica become inactive.
3997+
String path_to_wait_on = zookeeper_path + "/replicas/" + replica + "/queue/" + queue_entry_to_wait_for;
3998+
if (wait_for_non_active)
3999+
return getZooKeeper()->waitForDisappear(path_to_wait_on);
4000+
4001+
return getZooKeeper()->waitForDisappear(path_to_wait_on, check_replica_become_inactive);
39844002
}
39854003

39864004

src/Storages/StorageReplicatedMergeTree.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -486,7 +486,7 @@ class StorageReplicatedMergeTree final : public ext::shared_ptr_helper<StorageRe
486486
/** Wait until the specified replica executes the specified action from the log.
487487
* NOTE: See comment about locks above.
488488
*/
489-
void waitForReplicaToProcessLogEntry(const String & replica_name, const ReplicatedMergeTreeLogEntryData & entry);
489+
bool waitForReplicaToProcessLogEntry(const String & replica_name, const ReplicatedMergeTreeLogEntryData & entry, bool wait_for_non_active = true);
490490

491491
/// Choose leader replica, send requst to it and wait.
492492
void sendRequestToLeaderReplica(const ASTPtr & query, const Context & query_context);

tests/queries/0_stateless/01079_parallel_alter_modify_zookeeper.referece

Lines changed: 0 additions & 12 deletions
This file was deleted.

0 commit comments

Comments
 (0)