-
-
Notifications
You must be signed in to change notification settings - Fork 1.9k
Open
Labels
needs attentionIt's been a while since this was pushed on. Needs attention from the owner or a maintainer.It's been a while since this was pushed on. Needs attention from the owner or a maintainer.testsUnit tests and/or continuous integrationUnit tests and/or continuous integration
Description
From time to time dask/tests/test_layers.py::test_scheduler_highlevel_graph_unpack_import will fail in CI with the following error (see the traceback further below):
# Check whether we imported `lib` on the scheduler
> assert not any(module.startswith(lib) for module in new_modules)
E assert not True
E + where True = any(<generator object test_scheduler_highlevel_graph_unpack_import.<locals>.<genexpr> at 0x0000017314F18C80>)Full traceback:
================================== FAILURES ===================================
___ test_scheduler_highlevel_graph_unpack_import[False-_pq_pyarrow-pandas.] ___
[gw1] win32 -- Python 3.8.12 C:\Miniconda3\envs\test-environment\python.exe
op = <function _pq_pyarrow at 0x000001730E265670>, lib = 'pandas.'
optimize_graph = False
loop = <tornado.platform.asyncio.AsyncIOLoop object at 0x00000173150F8730>
tmpdir = local('C:\\Users\\runneradmin\\AppData\\Local\\Temp\\pytest-of-runneradmin\\pytest-0\\popen-gw1\\test_scheduler_highlevel_graph6')
@pytest.mark.parametrize(
"op,lib",
[
(_dataframe_shuffle, "pandas."),
(_dataframe_broadcast_join, "pandas."),
(_pq_pyarrow, "pandas."),
(_pq_fastparquet, "pandas."),
(_read_csv, "pandas."),
(_array_creation, "numpy."),
(_array_map_overlap, "numpy."),
],
)
@pytest.mark.parametrize("optimize_graph", [True, False])
def test_scheduler_highlevel_graph_unpack_import(op, lib, optimize_graph, loop, tmpdir):
# Test that array/dataframe-specific modules are not imported
# on the scheduler when an HLG layers are unpacked/materialized.
with cluster(scheduler_kwargs={"plugins": [SchedulerImportCheck(lib)]}) as (
scheduler,
workers,
):
with Client(scheduler["address"], loop=loop) as c:
# Perform a computation using a HighLevelGraph Layer
c.compute(op(tmpdir), optimize_graph=optimize_graph)
# Get the new modules which were imported on the scheduler during the computation
end_modules = c.run_on_scheduler(lambda: set(sys.modules))
start_modules = c.run_on_scheduler(
lambda dask_scheduler: dask_scheduler.plugins[
SchedulerImportCheck.name
].start_modules
)
new_modules = end_modules - start_modules
# Check that the scheduler didn't start with `lib`
# (otherwise we arent testing anything)
assert not any(module.startswith(lib) for module in start_modules)
# Check whether we imported `lib` on the scheduler
> assert not any(module.startswith(lib) for module in new_modules)
E assert not True
E + where True = any(<generator object test_scheduler_highlevel_graph_unpack_import.<locals>.<genexpr> at 0x0000017314F18C80>)
dask\tests\test_layers.py:188: AssertionError
---------------------------- Captured stderr call -----------------------------
distributed.http.proxy - INFO - To route to workers diagnostics web server please install jupyter-server-proxy: python -m pip install jupyter-server-proxy
C:\Miniconda3\envs\test-environment\lib\site-packages\distributed\node.py:160: UserWarning: Port 8787 is already in use.
Perhaps you already have a cluster running?
Hosting the HTTP server on port 53842 instead
warnings.warn(
distributed.scheduler - INFO - Clear task state
distributed.scheduler - INFO - Scheduler at: tcp://127.0.0.1:53843
distributed.scheduler - INFO - dashboard at: 127.0.0.1:53842
distributed.worker - INFO - Start worker at: tcp://127.0.0.1:53844
distributed.worker - INFO - Listening to: tcp://127.0.0.1:53844
distributed.worker - INFO - dashboard at: 127.0.0.1:53845
distributed.worker - INFO - Waiting to connect to: tcp://127.0.0.1:53843
distributed.worker - INFO - -------------------------------------------------
distributed.worker - INFO - Threads: 1
distributed.worker - INFO - Memory: 7.00 GiB
distributed.worker - INFO - Local Directory: D:\a\dask\dask\_test_worker-2d864e31-db54-4f34-8d0e-0a362ad8a84b\dask-worker-space\worker-vluf95em
distributed.worker - INFO - -------------------------------------------------
distributed.scheduler - INFO - Register worker <WorkerState 'tcp://127.0.0.1:53844', name: tcp://127.0.0.1:53844, status: undefined, memory: 0, processing: 0>
distributed.worker - INFO - Registered to: tcp://127.0.0.1:53843
distributed.worker - INFO - -------------------------------------------------
distributed.scheduler - INFO - Starting worker compute stream, tcp://127.0.0.1:53844
distributed.core - INFO - Starting established connection
distributed.core - INFO - Starting established connection
distributed.worker - INFO - Start worker at: tcp://127.0.0.1:53851
distributed.worker - INFO - Listening to: tcp://127.0.0.1:53851
distributed.worker - INFO - dashboard at: 127.0.0.1:53852
distributed.worker - INFO - Waiting to connect to: tcp://127.0.0.1:53843
distributed.worker - INFO - -------------------------------------------------
distributed.worker - INFO - Threads: 1
distributed.worker - INFO - Memory: 7.00 GiB
distributed.worker - INFO - Local Directory: D:\a\dask\dask\_test_worker-52400da1-4b9c-4e1b-92e2-6b03a81db034\dask-worker-space\worker-wdah6tcy
distributed.worker - INFO - -------------------------------------------------
distributed.scheduler - INFO - Register worker <WorkerState 'tcp://127.0.0.1:53851', name: tcp://127.0.0.1:53851, status: undefined, memory: 0, processing: 0>
distributed.scheduler - INFO - Starting worker compute stream, tcp://127.0.0.1:53851
distributed.core - INFO - Starting established connection
distributed.worker - INFO - Registered to: tcp://127.0.0.1:53843
distributed.worker - INFO - -------------------------------------------------
distributed.core - INFO - Starting established connection
distributed.scheduler - INFO - Receive client connection: Client-e6d1e9ae-5c41-11ec-9070-6045bd7a8291
distributed.core - INFO - Starting established connection
distributed.worker - INFO - Run out-of-band function 'lambda'
distributed.worker - INFO - Run out-of-band function 'lambda'
distributed.scheduler - INFO - Remove client Client-e6d1e9ae-5c41-11ec-9070-6045bd7a8291
distributed.scheduler - INFO - Remove client Client-e6d1e9ae-5c41-11ec-9070-6045bd7a8291
distributed.scheduler - INFO - Close client connection: Client-e6d1e9ae-5c41-11ec-9070-6045bd7a8291
distributed.worker - INFO - Stopping worker at tcp://127.0.0.1:53851
distributed.worker - INFO - Stopping worker at tcp://127.0.0.1:53844
distributed.scheduler - INFO - Remove worker <WorkerState 'tcp://127.0.0.1:53844', name: tcp://127.0.0.1:53844, status: running, memory: 0, processing: 0>
distributed.core - INFO - Removing comms to tcp://127.0.0.1:53844
____ test_scheduler_highlevel_graph_unpack_import[False-_read_csv-pandas.] ____
[gw1] win32 -- Python 3.8.12 C:\Miniconda3\envs\test-environment\python.exe
op = <function _read_csv at 0x000001730E265790>, lib = 'pandas.'
optimize_graph = False
loop = <tornado.platform.asyncio.AsyncIOLoop object at 0x0000017314F577F0>
tmpdir = local('C:\\Users\\runneradmin\\AppData\\Local\\Temp\\pytest-of-runneradmin\\pytest-0\\popen-gw1\\test_scheduler_highlevel_graph8')
@pytest.mark.parametrize(
"op,lib",
[
(_dataframe_shuffle, "pandas."),
(_dataframe_broadcast_join, "pandas."),
(_pq_pyarrow, "pandas."),
(_pq_fastparquet, "pandas."),
(_read_csv, "pandas."),
(_array_creation, "numpy."),
(_array_map_overlap, "numpy."),
],
)
@pytest.mark.parametrize("optimize_graph", [True, False])
def test_scheduler_highlevel_graph_unpack_import(op, lib, optimize_graph, loop, tmpdir):
# Test that array/dataframe-specific modules are not imported
# on the scheduler when an HLG layers are unpacked/materialized.
with cluster(scheduler_kwargs={"plugins": [SchedulerImportCheck(lib)]}) as (
scheduler,
workers,
):
with Client(scheduler["address"], loop=loop) as c:
# Perform a computation using a HighLevelGraph Layer
c.compute(op(tmpdir), optimize_graph=optimize_graph)
# Get the new modules which were imported on the scheduler during the computation
end_modules = c.run_on_scheduler(lambda: set(sys.modules))
start_modules = c.run_on_scheduler(
lambda dask_scheduler: dask_scheduler.plugins[
SchedulerImportCheck.name
].start_modules
)
new_modules = end_modules - start_modules
# Check that the scheduler didn't start with `lib`
# (otherwise we arent testing anything)
assert not any(module.startswith(lib) for module in start_modules)
# Check whether we imported `lib` on the scheduler
> assert not any(module.startswith(lib) for module in new_modules)
E assert not True
E + where True = any(<generator object test_scheduler_highlevel_graph_unpack_import.<locals>.<genexpr> at 0x000001731435D4A0>)
dask\tests\test_layers.py:188: AssertionError
---------------------------- Captured stderr call -----------------------------
distributed.http.proxy - INFO - To route to workers diagnostics web server please install jupyter-server-proxy: python -m pip install jupyter-server-proxy
C:\Miniconda3\envs\test-environment\lib\site-packages\distributed\node.py:160: UserWarning: Port 8787 is already in use.
Perhaps you already have a cluster running?
Hosting the HTTP server on port 53912 instead
warnings.warn(
distributed.scheduler - INFO - Clear task state
distributed.scheduler - INFO - Scheduler at: tcp://127.0.0.1:53913
distributed.scheduler - INFO - dashboard at: 127.0.0.1:53912
distributed.worker - INFO - Start worker at: tcp://127.0.0.1:53914
distributed.worker - INFO - Listening to: tcp://127.0.0.1:53914
distributed.worker - INFO - dashboard at: 127.0.0.1:53915
distributed.worker - INFO - Waiting to connect to: tcp://127.0.0.1:53913
distributed.worker - INFO - -------------------------------------------------
distributed.worker - INFO - Threads: 1
distributed.worker - INFO - Memory: 7.00 GiB
distributed.worker - INFO - Local Directory: D:\a\dask\dask\_test_worker-8101a517-fdfe-49d9-a1b7-96142cd7142f\dask-worker-space\worker-w913isno
distributed.worker - INFO - -------------------------------------------------
distributed.scheduler - INFO - Register worker <WorkerState 'tcp://127.0.0.1:53914', name: tcp://127.0.0.1:53914, status: undefined, memory: 0, processing: 0>
distributed.scheduler - INFO - Starting worker compute stream, tcp://127.0.0.1:53914
distributed.core - INFO - Starting established connection
distributed.worker - INFO - Registered to: tcp://127.0.0.1:53913
distributed.worker - INFO - -------------------------------------------------
distributed.core - INFO - Starting established connection
distributed.worker - INFO - Start worker at: tcp://127.0.0.1:53921
distributed.worker - INFO - Listening to: tcp://127.0.0.1:53921
distributed.worker - INFO - dashboard at: 127.0.0.1:53922
distributed.worker - INFO - Waiting to connect to: tcp://127.0.0.1:53913
distributed.worker - INFO - -------------------------------------------------
distributed.worker - INFO - Threads: 1
distributed.worker - INFO - Memory: 7.00 GiB
distributed.worker - INFO - Local Directory: D:\a\dask\dask\_test_worker-05a9730f-b3ce-4f8a-88b8-35fa94c0a60d\dask-worker-space\worker-abxa6lx5
distributed.worker - INFO - -------------------------------------------------
distributed.scheduler - INFO - Register worker <WorkerState 'tcp://127.0.0.1:53921', name: tcp://127.0.0.1:53921, status: undefined, memory: 0, processing: 0>
distributed.scheduler - INFO - Starting worker compute stream, tcp://127.0.0.1:53921
distributed.core - INFO - Starting established connection
distributed.worker - INFO - Registered to: tcp://127.0.0.1:53913
distributed.worker - INFO - -------------------------------------------------
distributed.core - INFO - Starting established connection
distributed.scheduler - INFO - Receive client connection: Client-ee1c03db-5c41-11ec-9070-6045bd7a8291
distributed.core - INFO - Starting established connection
distributed.worker - INFO - Run out-of-band function 'lambda'
distributed.worker - INFO - Run out-of-band function 'lambda'
distributed.scheduler - INFO - Remove client Client-ee1c03db-5c41-11ec-9070-6045bd7a8291
distributed.scheduler - INFO - Remove client Client-ee1c03db-5c41-11ec-9070-6045bd7a8291
distributed.scheduler - INFO - Close client connection: Client-ee1c03db-5c41-11ec-9070-6045bd7a8291
distributed.worker - INFO - Stopping worker at tcp://127.0.0.1:53914
distributed.scheduler - INFO - Remove worker <WorkerState 'tcp://127.0.0.1:53914', name: tcp://127.0.0.1:53914, status: running, memory: 0, processing: 0>
distributed.core - INFO - Removing comms to tcp://127.0.0.1:53914
distributed.worker - INFO - Stopping worker at tcp://127.0.0.1:53921
distributed.scheduler - INFO - Scheduler closing...Reactions are currently unavailable
Metadata
Metadata
Assignees
Labels
needs attentionIt's been a while since this was pushed on. Needs attention from the owner or a maintainer.It's been a while since this was pushed on. Needs attention from the owner or a maintainer.testsUnit tests and/or continuous integrationUnit tests and/or continuous integration