Skip to content

Intermittent failure in Semaphore tests : test_release_failure #6190

@mrocklin

Description

@mrocklin
____________________________ test_release_failure _____________________________

c = <Client: No scheduler connected>
s = <Scheduler 'tcp://127.0.0.1:63277', workers: 0, cores: 0, tasks: 0>
a = <Worker 'tcp://127.0.0.1:63278', name: 0, status: closed, stored: 0, running: 0/1, ready: 0, comm: 0, waiting: 0>
b = <Worker 'tcp://127.0.0.1:63280', name: 1, status: closed, stored: 0, running: 0/2, ready: 0, comm: 0, waiting: 0>

    @gen_cluster(
        client=True,
        config={
            "distributed.scheduler.locks.lease-timeout": "100ms",
            "distributed.scheduler.locks.lease-validation-interval": "100ms",
        },
    )
    async def test_release_failure(c, s, a, b):
        """Don't raise even if release fails: lease will be cleaned up by the lease-validation after
        a specified interval anyways (see config parameters used)."""
    
        with dask.config.set({"distributed.comm.retry.count": 1}):
            pool = await FlakyConnectionPool(failing_connections=5)
    
            semaphore = await Semaphore(
                max_leases=2,
                name="resource_we_want_to_limit",
                scheduler_rpc=pool(s.address),
            )
            await semaphore.acquire()
            pool.activate()  # Comm chaos starts
    
            # Release fails (after a single retry) because of broken connections
            with captured_logger(
                "distributed.semaphore", level=logging.ERROR
            ) as semaphore_log:
                with captured_logger("distributed.utils_comm") as retry_log:
                    assert await semaphore.release() is False
    
            with captured_logger(
                "distributed.semaphore", level=logging.DEBUG
            ) as semaphore_cleanup_log:
                pool.deactivate()  # comm chaos stops
                assert await semaphore.get_value() == 1  # lease is still registered
                await asyncio.sleep(0.2)  # Wait for lease to be cleaned up
    
            # Check release was retried
            retry_log = retry_log.getvalue().split("\n")[0]
            assert retry_log.startswith(
                "Retrying semaphore release:"
            ) and retry_log.endswith("after exception in attempt 0/1: ")
            # Check release failed
            semaphore_log = semaphore_log.getvalue().split("\n")[0]
            assert semaphore_log.startswith(
                "Release failed for id="
            ) and semaphore_log.endswith("Cluster network might be unstable?")
    
            # Check lease has timed out
>           assert any(
                log.startswith("Lease") and "timed out after" in log
                for log in semaphore_cleanup_log.getvalue().split("\n")
            )
E           assert False
E            +  where False = any(<generator object test_release_failure.<locals>.<genexpr> at 0x000002064636ABA0>)

distributed\tests\test_semaphore.py:596: AssertionError

cc @fjetter

https://github.com/dask/distributed/runs/6148590819?check_suite_focus=true

Metadata

Metadata

Assignees

Labels

flaky testIntermittent failures on CI.

Type

No type

Projects

No projects

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions