Skip to content

test_scheduler_highlevel_graph_unpack_import flaky #8480

@jrbourbeau

Description

@jrbourbeau

From time to time dask/tests/test_layers.py::test_scheduler_highlevel_graph_unpack_import will fail in CI with the following error (see the traceback further below):

                # Check whether we imported `lib` on the scheduler
>               assert not any(module.startswith(lib) for module in new_modules)
E               assert not True
E                +  where True = any(<generator object test_scheduler_highlevel_graph_unpack_import.<locals>.<genexpr> at 0x0000017314F18C80>)
Full traceback:
================================== FAILURES ===================================
___ test_scheduler_highlevel_graph_unpack_import[False-_pq_pyarrow-pandas.] ___
[gw1] win32 -- Python 3.8.12 C:\Miniconda3\envs\test-environment\python.exe

op = <function _pq_pyarrow at 0x000001730E265670>, lib = 'pandas.'
optimize_graph = False
loop = <tornado.platform.asyncio.AsyncIOLoop object at 0x00000173150F8730>
tmpdir = local('C:\\Users\\runneradmin\\AppData\\Local\\Temp\\pytest-of-runneradmin\\pytest-0\\popen-gw1\\test_scheduler_highlevel_graph6')

    @pytest.mark.parametrize(
        "op,lib",
        [
            (_dataframe_shuffle, "pandas."),
            (_dataframe_broadcast_join, "pandas."),
            (_pq_pyarrow, "pandas."),
            (_pq_fastparquet, "pandas."),
            (_read_csv, "pandas."),
            (_array_creation, "numpy."),
            (_array_map_overlap, "numpy."),
        ],
    )
    @pytest.mark.parametrize("optimize_graph", [True, False])
    def test_scheduler_highlevel_graph_unpack_import(op, lib, optimize_graph, loop, tmpdir):
        # Test that array/dataframe-specific modules are not imported
        # on the scheduler when an HLG layers are unpacked/materialized.
    
        with cluster(scheduler_kwargs={"plugins": [SchedulerImportCheck(lib)]}) as (
            scheduler,
            workers,
        ):
            with Client(scheduler["address"], loop=loop) as c:
                # Perform a computation using a HighLevelGraph Layer
                c.compute(op(tmpdir), optimize_graph=optimize_graph)
    
                # Get the new modules which were imported on the scheduler during the computation
                end_modules = c.run_on_scheduler(lambda: set(sys.modules))
                start_modules = c.run_on_scheduler(
                    lambda dask_scheduler: dask_scheduler.plugins[
                        SchedulerImportCheck.name
                    ].start_modules
                )
                new_modules = end_modules - start_modules
    
                # Check that the scheduler didn't start with `lib`
                # (otherwise we arent testing anything)
                assert not any(module.startswith(lib) for module in start_modules)
    
                # Check whether we imported `lib` on the scheduler
>               assert not any(module.startswith(lib) for module in new_modules)
E               assert not True
E                +  where True = any(<generator object test_scheduler_highlevel_graph_unpack_import.<locals>.<genexpr> at 0x0000017314F18C80>)

dask\tests\test_layers.py:188: AssertionError
---------------------------- Captured stderr call -----------------------------
distributed.http.proxy - INFO - To route to workers diagnostics web server please install jupyter-server-proxy: python -m pip install jupyter-server-proxy

C:\Miniconda3\envs\test-environment\lib\site-packages\distributed\node.py:160: UserWarning: Port 8787 is already in use.

Perhaps you already have a cluster running?

Hosting the HTTP server on port 53842 instead

  warnings.warn(

distributed.scheduler - INFO - Clear task state

distributed.scheduler - INFO -   Scheduler at:     tcp://127.0.0.1:53843

distributed.scheduler - INFO -   dashboard at:           127.0.0.1:53842

distributed.worker - INFO -       Start worker at:      tcp://127.0.0.1:53844

distributed.worker - INFO -          Listening to:      tcp://127.0.0.1:53844

distributed.worker - INFO -          dashboard at:            127.0.0.1:53845

distributed.worker - INFO - Waiting to connect to:      tcp://127.0.0.1:53843

distributed.worker - INFO - -------------------------------------------------

distributed.worker - INFO -               Threads:                          1

distributed.worker - INFO -                Memory:                   7.00 GiB

distributed.worker - INFO -       Local Directory: D:\a\dask\dask\_test_worker-2d864e31-db54-4f34-8d0e-0a362ad8a84b\dask-worker-space\worker-vluf95em

distributed.worker - INFO - -------------------------------------------------

distributed.scheduler - INFO - Register worker <WorkerState 'tcp://127.0.0.1:53844', name: tcp://127.0.0.1:53844, status: undefined, memory: 0, processing: 0>

distributed.worker - INFO -         Registered to:      tcp://127.0.0.1:53843

distributed.worker - INFO - -------------------------------------------------

distributed.scheduler - INFO - Starting worker compute stream, tcp://127.0.0.1:53844

distributed.core - INFO - Starting established connection

distributed.core - INFO - Starting established connection

distributed.worker - INFO -       Start worker at:      tcp://127.0.0.1:53851

distributed.worker - INFO -          Listening to:      tcp://127.0.0.1:53851

distributed.worker - INFO -          dashboard at:            127.0.0.1:53852

distributed.worker - INFO - Waiting to connect to:      tcp://127.0.0.1:53843

distributed.worker - INFO - -------------------------------------------------

distributed.worker - INFO -               Threads:                          1

distributed.worker - INFO -                Memory:                   7.00 GiB

distributed.worker - INFO -       Local Directory: D:\a\dask\dask\_test_worker-52400da1-4b9c-4e1b-92e2-6b03a81db034\dask-worker-space\worker-wdah6tcy

distributed.worker - INFO - -------------------------------------------------

distributed.scheduler - INFO - Register worker <WorkerState 'tcp://127.0.0.1:53851', name: tcp://127.0.0.1:53851, status: undefined, memory: 0, processing: 0>

distributed.scheduler - INFO - Starting worker compute stream, tcp://127.0.0.1:53851

distributed.core - INFO - Starting established connection

distributed.worker - INFO -         Registered to:      tcp://127.0.0.1:53843

distributed.worker - INFO - -------------------------------------------------

distributed.core - INFO - Starting established connection

distributed.scheduler - INFO - Receive client connection: Client-e6d1e9ae-5c41-11ec-9070-6045bd7a8291

distributed.core - INFO - Starting established connection

distributed.worker - INFO - Run out-of-band function 'lambda'

distributed.worker - INFO - Run out-of-band function 'lambda'

distributed.scheduler - INFO - Remove client Client-e6d1e9ae-5c41-11ec-9070-6045bd7a8291

distributed.scheduler - INFO - Remove client Client-e6d1e9ae-5c41-11ec-9070-6045bd7a8291

distributed.scheduler - INFO - Close client connection: Client-e6d1e9ae-5c41-11ec-9070-6045bd7a8291

distributed.worker - INFO - Stopping worker at tcp://127.0.0.1:53851

distributed.worker - INFO - Stopping worker at tcp://127.0.0.1:53844

distributed.scheduler - INFO - Remove worker <WorkerState 'tcp://127.0.0.1:53844', name: tcp://127.0.0.1:53844, status: running, memory: 0, processing: 0>

distributed.core - INFO - Removing comms to tcp://127.0.0.1:53844

____ test_scheduler_highlevel_graph_unpack_import[False-_read_csv-pandas.] ____
[gw1] win32 -- Python 3.8.12 C:\Miniconda3\envs\test-environment\python.exe

op = <function _read_csv at 0x000001730E265790>, lib = 'pandas.'
optimize_graph = False
loop = <tornado.platform.asyncio.AsyncIOLoop object at 0x0000017314F577F0>
tmpdir = local('C:\\Users\\runneradmin\\AppData\\Local\\Temp\\pytest-of-runneradmin\\pytest-0\\popen-gw1\\test_scheduler_highlevel_graph8')

    @pytest.mark.parametrize(
        "op,lib",
        [
            (_dataframe_shuffle, "pandas."),
            (_dataframe_broadcast_join, "pandas."),
            (_pq_pyarrow, "pandas."),
            (_pq_fastparquet, "pandas."),
            (_read_csv, "pandas."),
            (_array_creation, "numpy."),
            (_array_map_overlap, "numpy."),
        ],
    )
    @pytest.mark.parametrize("optimize_graph", [True, False])
    def test_scheduler_highlevel_graph_unpack_import(op, lib, optimize_graph, loop, tmpdir):
        # Test that array/dataframe-specific modules are not imported
        # on the scheduler when an HLG layers are unpacked/materialized.
    
        with cluster(scheduler_kwargs={"plugins": [SchedulerImportCheck(lib)]}) as (
            scheduler,
            workers,
        ):
            with Client(scheduler["address"], loop=loop) as c:
                # Perform a computation using a HighLevelGraph Layer
                c.compute(op(tmpdir), optimize_graph=optimize_graph)
    
                # Get the new modules which were imported on the scheduler during the computation
                end_modules = c.run_on_scheduler(lambda: set(sys.modules))
                start_modules = c.run_on_scheduler(
                    lambda dask_scheduler: dask_scheduler.plugins[
                        SchedulerImportCheck.name
                    ].start_modules
                )
                new_modules = end_modules - start_modules
    
                # Check that the scheduler didn't start with `lib`
                # (otherwise we arent testing anything)
                assert not any(module.startswith(lib) for module in start_modules)
    
                # Check whether we imported `lib` on the scheduler
>               assert not any(module.startswith(lib) for module in new_modules)
E               assert not True
E                +  where True = any(<generator object test_scheduler_highlevel_graph_unpack_import.<locals>.<genexpr> at 0x000001731435D4A0>)

dask\tests\test_layers.py:188: AssertionError
---------------------------- Captured stderr call -----------------------------
distributed.http.proxy - INFO - To route to workers diagnostics web server please install jupyter-server-proxy: python -m pip install jupyter-server-proxy

C:\Miniconda3\envs\test-environment\lib\site-packages\distributed\node.py:160: UserWarning: Port 8787 is already in use.

Perhaps you already have a cluster running?

Hosting the HTTP server on port 53912 instead

  warnings.warn(

distributed.scheduler - INFO - Clear task state

distributed.scheduler - INFO -   Scheduler at:     tcp://127.0.0.1:53913

distributed.scheduler - INFO -   dashboard at:           127.0.0.1:53912

distributed.worker - INFO -       Start worker at:      tcp://127.0.0.1:53914

distributed.worker - INFO -          Listening to:      tcp://127.0.0.1:53914

distributed.worker - INFO -          dashboard at:            127.0.0.1:53915

distributed.worker - INFO - Waiting to connect to:      tcp://127.0.0.1:53913

distributed.worker - INFO - -------------------------------------------------

distributed.worker - INFO -               Threads:                          1

distributed.worker - INFO -                Memory:                   7.00 GiB

distributed.worker - INFO -       Local Directory: D:\a\dask\dask\_test_worker-8101a517-fdfe-49d9-a1b7-96142cd7142f\dask-worker-space\worker-w913isno

distributed.worker - INFO - -------------------------------------------------

distributed.scheduler - INFO - Register worker <WorkerState 'tcp://127.0.0.1:53914', name: tcp://127.0.0.1:53914, status: undefined, memory: 0, processing: 0>

distributed.scheduler - INFO - Starting worker compute stream, tcp://127.0.0.1:53914

distributed.core - INFO - Starting established connection

distributed.worker - INFO -         Registered to:      tcp://127.0.0.1:53913

distributed.worker - INFO - -------------------------------------------------

distributed.core - INFO - Starting established connection

distributed.worker - INFO -       Start worker at:      tcp://127.0.0.1:53921

distributed.worker - INFO -          Listening to:      tcp://127.0.0.1:53921

distributed.worker - INFO -          dashboard at:            127.0.0.1:53922

distributed.worker - INFO - Waiting to connect to:      tcp://127.0.0.1:53913

distributed.worker - INFO - -------------------------------------------------

distributed.worker - INFO -               Threads:                          1

distributed.worker - INFO -                Memory:                   7.00 GiB

distributed.worker - INFO -       Local Directory: D:\a\dask\dask\_test_worker-05a9730f-b3ce-4f8a-88b8-35fa94c0a60d\dask-worker-space\worker-abxa6lx5

distributed.worker - INFO - -------------------------------------------------

distributed.scheduler - INFO - Register worker <WorkerState 'tcp://127.0.0.1:53921', name: tcp://127.0.0.1:53921, status: undefined, memory: 0, processing: 0>

distributed.scheduler - INFO - Starting worker compute stream, tcp://127.0.0.1:53921

distributed.core - INFO - Starting established connection

distributed.worker - INFO -         Registered to:      tcp://127.0.0.1:53913

distributed.worker - INFO - -------------------------------------------------

distributed.core - INFO - Starting established connection

distributed.scheduler - INFO - Receive client connection: Client-ee1c03db-5c41-11ec-9070-6045bd7a8291

distributed.core - INFO - Starting established connection

distributed.worker - INFO - Run out-of-band function 'lambda'

distributed.worker - INFO - Run out-of-band function 'lambda'

distributed.scheduler - INFO - Remove client Client-ee1c03db-5c41-11ec-9070-6045bd7a8291

distributed.scheduler - INFO - Remove client Client-ee1c03db-5c41-11ec-9070-6045bd7a8291

distributed.scheduler - INFO - Close client connection: Client-ee1c03db-5c41-11ec-9070-6045bd7a8291

distributed.worker - INFO - Stopping worker at tcp://127.0.0.1:53914

distributed.scheduler - INFO - Remove worker <WorkerState 'tcp://127.0.0.1:53914', name: tcp://127.0.0.1:53914, status: running, memory: 0, processing: 0>

distributed.core - INFO - Removing comms to tcp://127.0.0.1:53914

distributed.worker - INFO - Stopping worker at tcp://127.0.0.1:53921

distributed.scheduler - INFO - Scheduler closing...

Metadata

Metadata

Assignees

No one assigned

    Labels

    needs attentionIt's been a while since this was pushed on. Needs attention from the owner or a maintainer.testsUnit tests and/or continuous integration

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions