Skip to content

Commit 8bb656d

Browse files
committed
Add context manager for partition manager
1 parent 3538886 commit 8bb656d

1 file changed

Lines changed: 45 additions & 40 deletions

File tree

  • tests/integration/test_quorum_inserts

tests/integration/test_quorum_inserts/test.py

Lines changed: 45 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -379,50 +379,55 @@ def test_insert_quorum_with_keeper_loss_connection(started_cluster):
379379
)
380380
)
381381

382-
pm = PartitionManager()
383-
pm.drop_instance_zk_connections(zero)
384-
385-
retries = 0
386-
zk = cluster.get_kazoo_client("zoo1")
387-
while True:
388-
if (
389-
zk.exists(f"/clickhouse/tables/{table_name}/replicas/zero/is_active")
390-
is None
391-
):
392-
break
393-
print("replica is still active")
394-
time.sleep(1)
395-
retries += 1
396-
if retries == 120:
397-
raise Exception("Can not wait cluster replica inactive")
398-
399-
first.query("SYSTEM ENABLE FAILPOINT finish_set_quorum_failed_parts")
400-
quorum_fail_future = executor.submit(
401-
lambda: first.query(
402-
"SYSTEM WAIT FAILPOINT finish_set_quorum_failed_parts", timeout=300
382+
with PartitionManager() as pm:
383+
pm.drop_instance_zk_connections(zero)
384+
385+
retries = 0
386+
zk = cluster.get_kazoo_client("zoo1")
387+
while True:
388+
if (
389+
zk.exists(
390+
f"/clickhouse/tables/{table_name}/replicas/zero/is_active"
391+
)
392+
is None
393+
):
394+
break
395+
print("replica is still active")
396+
time.sleep(1)
397+
retries += 1
398+
if retries == 120:
399+
raise Exception("Can not wait cluster replica inactive")
400+
401+
first.query("SYSTEM ENABLE FAILPOINT finish_set_quorum_failed_parts")
402+
quorum_fail_future = executor.submit(
403+
lambda: first.query(
404+
"SYSTEM WAIT FAILPOINT finish_set_quorum_failed_parts", timeout=300
405+
)
403406
)
404-
)
405-
first.query(f"SYSTEM START FETCHES {table_name}")
407+
first.query(f"SYSTEM START FETCHES {table_name}")
406408

407-
concurrent.futures.wait([quorum_fail_future])
409+
concurrent.futures.wait([quorum_fail_future])
408410

409-
assert quorum_fail_future.exception() is None
411+
assert quorum_fail_future.exception() is None
410412

411-
zero.query("SYSTEM ENABLE FAILPOINT finish_clean_quorum_failed_parts")
412-
clean_quorum_fail_parts_future = executor.submit(
413-
lambda: first.query(
414-
"SYSTEM WAIT FAILPOINT finish_clean_quorum_failed_parts", timeout=300
413+
zero.query("SYSTEM ENABLE FAILPOINT finish_clean_quorum_failed_parts")
414+
clean_quorum_fail_parts_future = executor.submit(
415+
lambda: first.query(
416+
"SYSTEM WAIT FAILPOINT finish_clean_quorum_failed_parts",
417+
timeout=300,
418+
)
415419
)
416-
)
417-
pm.restore_instance_zk_connections(zero)
418-
concurrent.futures.wait([clean_quorum_fail_parts_future])
420+
pm.restore_instance_zk_connections(zero)
421+
concurrent.futures.wait([clean_quorum_fail_parts_future])
419422

420-
assert clean_quorum_fail_parts_future.exception() is None
423+
assert clean_quorum_fail_parts_future.exception() is None
421424

422-
zero.query("SYSTEM DISABLE FAILPOINT replicated_merge_tree_insert_retry_pause")
423-
concurrent.futures.wait([insert_future])
424-
assert insert_future.exception() is not None
425-
assert not zero.contains_in_log("LOGICAL_ERROR")
426-
assert zero.contains_in_log(
427-
"fails to commit and will not retry or clean garbage"
428-
)
425+
zero.query(
426+
"SYSTEM DISABLE FAILPOINT replicated_merge_tree_insert_retry_pause"
427+
)
428+
concurrent.futures.wait([insert_future])
429+
assert insert_future.exception() is not None
430+
assert not zero.contains_in_log("LOGICAL_ERROR")
431+
assert zero.contains_in_log(
432+
"fails to commit and will not retry or clean garbage"
433+
)

0 commit comments

Comments
 (0)