Conversation
| self.periodic_callbacks["cluster-repr"] = pc | ||
| pc.start() | ||
|
|
||
| self.loop.add_callback(install) |
There was a problem hiding this comment.
the PeriodicCallbacks in self.periodic_callbacks need to be started and stopped on the same loop as self.loop because they bind to IOLoop.current() when PeriodicCallback.start is called
When running a synchronous Cluster the loop will be running in a different thread to where cluster._ipython_widget_ is called from
Unit Test Results 15 files ±0 15 suites ±0 6h 17m 2s ⏱️ - 3m 47s For more details on these failures, see this check. Results for commit 7dc400f. ± Comparison against base commit 046ab17. ♻️ This comment has been updated with latest results. |
There was a problem hiding this comment.
Nice -- thanks @graingert. Since the cluster widget is a high-visibility portion of the codebase, could you add a test for this?
EDIT: To be clear, I trust that the changes you've made are correct. I just want to avoid a future regression
2714ca2 to
7dc400f
Compare
| processes=False, | ||
| ) as cluster: | ||
| cluster._ipython_display_() | ||
| assert cluster.sync(get_ioloop, cluster) is loop |
There was a problem hiding this comment.
here's the test failing on the main branch:
distributed/deploy/tests/test_local.py::test_ipywidgets_loop FAILED [100%]
======================================================================================================================================== FAILURES ========================================================================================================================================
__________________________________________________________________________________________________________________________________ test_ipywidgets_loop __________________________________________________________________________________________________________________________________
loop = <tornado.platform.asyncio.AsyncIOLoop object at 0x7ff32fa29900>
def test_ipywidgets_loop(loop):
"""
Previously cluster._ipython_display_ attached the PeriodicCallback to the
currently running loop, See https://github.com/dask/distributed/pull/6444
"""
ipywidgets = pytest.importorskip("ipywidgets")
async def get_ioloop(cluster):
return cluster.periodic_callbacks["cluster-repr"].io_loop
async def amain():
# running synchronous code in an async context to setup a
# IOLoop.current() that's different from cluster.loop
with LocalCluster(
n_workers=0,
silence_logs=False,
loop=loop,
dashboard_address=":0",
processes=False,
) as cluster:
cluster._ipython_display_()
assert cluster.sync(get_ioloop, cluster) is loop
box = cluster._cached_widget
assert isinstance(box, ipywidgets.Widget)
> asyncio.run(amain())
distributed/deploy/tests/test_local.py:609:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
../../miniconda3/envs/dask-distributed/lib/python3.10/asyncio/runners.py:44: in run
return loop.run_until_complete(main)
../../miniconda3/envs/dask-distributed/lib/python3.10/asyncio/base_events.py:646: in run_until_complete
return future.result()
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
async def amain():
# running synchronous code in an async context to setup a
# IOLoop.current() that's different from cluster.loop
with LocalCluster(
n_workers=0,
silence_logs=False,
loop=loop,
dashboard_address=":0",
processes=False,
) as cluster:
cluster._ipython_display_()
> assert cluster.sync(get_ioloop, cluster) is loop
E AssertionError: assert <tornado.platform.asyncio.AsyncIOMainLoop object at 0x7ff2d943ee00> is <tornado.platform.asyncio.AsyncIOLoop object at 0x7ff32fa29900>
E + where <tornado.platform.asyncio.AsyncIOMainLoop object at 0x7ff2d943ee00> = <bound method SyncMethodMixin.sync of LocalCluster(f8c7cda6, 'inproc://192.168.50.210/64690/1', workers=0, threads=0, memory=0 B)>(<function test_ipywidgets_loop.<locals>.get_ioloop at 0x7ff32f9df250>, LocalCluster(f8c7cda6, 'inproc://192.168.50.210/64690/1', workers=0, threads=0, memory=0 B))
E + where <bound method SyncMethodMixin.sync of LocalCluster(f8c7cda6, 'inproc://192.168.50.210/64690/1', workers=0, threads=0, memory=0 B)> = LocalCluster(f8c7cda6, 'inproc://192.168.50.210/64690/1', workers=0, threads=0, memory=0 B).sync
distributed/deploy/tests/test_local.py:605: AssertionError
|
Heads up @hendrikmakait , your recent test may intermittently fail _______________________ test_watch_requires_lock_to_run ________________________
def test_watch_requires_lock_to_run():
start = time()
def stop_lock():
return time() > start + 0.600
def stop_profile():
return time() > start + 0.500
def hold_lock(stop):
with lock:
while not stop():
sleep(0.1)
start_threads = threading.active_count()
# Hog the lock over the entire duration of watch
thread = threading.Thread(
target=hold_lock, name="Hold Lock", kwargs={"stop": stop_lock}
)
thread.daemon = True
thread.start()
log = watch(interval="10ms", cycle="50ms", stop=stop_profile)
start = time() # wait until thread starts up
while threading.active_count() < start_threads + 2:
assert time() < start + 2
sleep(0.01)
sleep(0.5)
> assert len(log) == 0
E AssertionError: assert 1 == 0
E + where 1 = len(deque([(1653559454.046232, {'children': {'<module>;/Users/runner/miniconda3/envs/dask-distributed/bin/pytest;4': {'children': {'console_main;/Users/runner/miniconda3/envs/dask-distributed/lib/python3.8/site-packages/_pytest/config/__init__.py;180': {...}}, 'count': 1, 'description': {'filename': '/Users/runner/miniconda3/envs/dask-distributed/bin/pytest', 'line': 'sys.exit(console_main())\n', 'line_number': 11, 'name': '<module>'}, 'identifier': '<module>;/Users/runner/miniconda3/envs/dask-distributed/bin/pytest;4'}}, 'count': 1, 'description': {'filename': '', 'line': '', 'line_number': 0, 'name': ''}, 'identifier': 'root'})]))
distributed/tests/test_profile.py:242: AssertionError |
|
Thank you @graingert . This looks good to me. Merging. |
distributed/tests/test_profile.py fails occasionally (e.g., see #6444 (comment)). This test restructures the test to avoid timing-based flakes.
) distributed/tests/test_profile.py fails occasionally (e.g., see dask#6444 (comment)). This test restructures the test to avoid timing-based flakes.
Closes #xxxx
pre-commit run --all-files