-
-
Notifications
You must be signed in to change notification settings - Fork 757
Closed
Labels
flaky testIntermittent failures on CI.Intermittent failures on CI.
Description
____________________________ test_release_failure _____________________________
c = <Client: No scheduler connected>
s = <Scheduler 'tcp://127.0.0.1:63277', workers: 0, cores: 0, tasks: 0>
a = <Worker 'tcp://127.0.0.1:63278', name: 0, status: closed, stored: 0, running: 0/1, ready: 0, comm: 0, waiting: 0>
b = <Worker 'tcp://127.0.0.1:63280', name: 1, status: closed, stored: 0, running: 0/2, ready: 0, comm: 0, waiting: 0>
@gen_cluster(
client=True,
config={
"distributed.scheduler.locks.lease-timeout": "100ms",
"distributed.scheduler.locks.lease-validation-interval": "100ms",
},
)
async def test_release_failure(c, s, a, b):
"""Don't raise even if release fails: lease will be cleaned up by the lease-validation after
a specified interval anyways (see config parameters used)."""
with dask.config.set({"distributed.comm.retry.count": 1}):
pool = await FlakyConnectionPool(failing_connections=5)
semaphore = await Semaphore(
max_leases=2,
name="resource_we_want_to_limit",
scheduler_rpc=pool(s.address),
)
await semaphore.acquire()
pool.activate() # Comm chaos starts
# Release fails (after a single retry) because of broken connections
with captured_logger(
"distributed.semaphore", level=logging.ERROR
) as semaphore_log:
with captured_logger("distributed.utils_comm") as retry_log:
assert await semaphore.release() is False
with captured_logger(
"distributed.semaphore", level=logging.DEBUG
) as semaphore_cleanup_log:
pool.deactivate() # comm chaos stops
assert await semaphore.get_value() == 1 # lease is still registered
await asyncio.sleep(0.2) # Wait for lease to be cleaned up
# Check release was retried
retry_log = retry_log.getvalue().split("\n")[0]
assert retry_log.startswith(
"Retrying semaphore release:"
) and retry_log.endswith("after exception in attempt 0/1: ")
# Check release failed
semaphore_log = semaphore_log.getvalue().split("\n")[0]
assert semaphore_log.startswith(
"Release failed for id="
) and semaphore_log.endswith("Cluster network might be unstable?")
# Check lease has timed out
> assert any(
log.startswith("Lease") and "timed out after" in log
for log in semaphore_cleanup_log.getvalue().split("\n")
)
E assert False
E + where False = any(<generator object test_release_failure.<locals>.<genexpr> at 0x000002064636ABA0>)
distributed\tests\test_semaphore.py:596: AssertionErrorcc @fjetter
https://github.com/dask/distributed/runs/6148590819?check_suite_focus=true
Reactions are currently unavailable
Metadata
Metadata
Assignees
Labels
flaky testIntermittent failures on CI.Intermittent failures on CI.