Skip to content

Flaky test_workspace_concurrency_intense #4766

@jrbourbeau

Description

@jrbourbeau

For example, see this CI build

Full traceback:
______________________ test_workspace_concurrency_intense ______________________

tmpdir = local('/private/var/folders/24/8k48jl6d249_n_qfxwsl6xvm0000gn/T/pytest-of-runner/pytest-0/test_workspace_concurrency_int10')

    @pytest.mark.flaky(reruns=10, reruns_delay=5, condition=MACOS)
    @pytest.mark.slow
    def test_workspace_concurrency_intense(tmpdir):
>       n_created, n_purged = _test_workspace_concurrency(tmpdir, 8.0, 16)

distributed/tests/test_diskutils.py:287: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

tmpdir = local('/private/var/folders/24/8k48jl6d249_n_qfxwsl6xvm0000gn/T/pytest-of-runner/pytest-0/test_workspace_concurrency_int10')
timeout = 8.0, max_procs = 16

    def _test_workspace_concurrency(tmpdir, timeout, max_procs):
        """
        WorkSpace concurrency test.  We merely check that no exception or
        deadlock happens.
        """
        base_dir = str(tmpdir)
    
        err_q = mp_context.Queue()
        purged_q = mp_context.Queue()
        stop_evt = mp_context.Event()
        ws = WorkSpace(base_dir)
        # Make sure purging only happens in the child processes
        ws._purge_leftovers = lambda: None
    
        # Run a bunch of child processes that will try to purge concurrently
        NPROCS = 2 if sys.platform == "win32" else max_procs
        processes = [
            mp_context.Process(
                target=_workspace_concurrency, args=(base_dir, purged_q, err_q, stop_evt)
            )
            for i in range(NPROCS)
        ]
        for p in processes:
            p.start()
    
        n_created = 0
        n_purged = 0
        try:
            t1 = time()
            while time() - t1 < timeout:
                # Add a bunch of locks, and simulate forgetting them.
                # The concurrent processes should try to purge them.
                for i in range(50):
                    d = ws.new_work_dir(prefix="workspace-concurrency-")
                    d._finalizer.detach()
                    n_created += 1
                sleep(1e-2)
        finally:
            stop_evt.set()
            for p in processes:
                p.join()
    
        # Any errors?
        try:
            err = err_q.get_nowait()
        except queue.Empty:
            pass
        else:
            raise err
    
        try:
            while True:
                n_purged += purged_q.get_nowait()
        except queue.Empty:
            pass
        # We attempted to purge most directories at some point
>       assert n_purged >= 0.5 * n_created > 0
E       assert 0 >= (0.5 * 2100)

distributed/tests/test_diskutils.py:272: AssertionError

Metadata

Metadata

Assignees

No one assigned

    Labels

    flaky testIntermittent failures on CI.

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions