Conversation
fc42dbd to
e5a8ca3
Compare
|
Besides this currently being broken, this implies that users who actually want the scheduler to be using a different event loop will need to make sure that the scheduler is initialized while running in a coro/task on that loop, correct? IIUC, this is how stdlib asyncio also suggest for their objects (e.g. Lock) to be used. |
|
I'm fine with this in principle. cc'ing @jacobtomlinson to keep him informed. |
|
This seems reasonable to me. Thanks for keeping me up to date. |
|
this needs a fix from #6231 - converting back into draft for now |
Unit Test Results 15 files + 15 15 suites +15 6h 20m 40s ⏱️ + 6h 20m 40s For more details on these failures, see this check. Results for commit 7f5f8ff. ± Comparison against base commit f20f776. ♻️ This comment has been updated with latest results. |
|
this also applies to subclasses of the distributed/distributed/core.py Line 254 in 1346671 import asyncio
import concurrent.futures
import contextlib
import sys
from tornado.ioloop import IOLoop
from distributed import Worker
from distributed.utils import sync
def _run_and_close_tornado(async_fn, /, *args, **kwargs):
loop = None
async def coro():
nonlocal loop
loop = IOLoop.current()
return await async_fn(*args, **kwargs)
try:
asyncio.run(coro())
finally:
loop.close(close_fds=True)
@contextlib.contextmanager
def loop_in_thread():
loop_started = concurrent.futures.Future()
with concurrent.futures.ThreadPoolExecutor(
1, thread_name_prefix="test IOLoop"
) as tpe:
async def run():
io_loop = IOLoop.current()
stop_event = asyncio.Event()
loop_started.set_result((io_loop, stop_event))
await stop_event.wait()
# run asyncio.run in a thread and collect exceptions from *either*
# the loop failing to start, or failing to close
ran = tpe.submit(_run_and_close_tornado, run)
for f in concurrent.futures.as_completed((loop_started, ran)):
if f is loop_started:
io_loop, stop_event = loop_started.result()
try:
yield io_loop
finally:
io_loop.add_callback(stop_event.set)
elif f is ran:
# if this is the first iteration the loop failed to start
# if it's the second iteration the loop has finished or
# the loop failed to close and we need to raise the exception
ran.result()
return
def test_io_loop_in_thread():
async def with_worker_contention(worker):
async def with_worker():
async with worker:
pass
return await asyncio.gather(with_worker(), with_worker())
async def main(loop):
worker = Worker(f"tcp://127.0.0.1:1234", loop=loop)
sync(loop, with_worker_contention, worker=worker)
with loop_in_thread() as loop:
asyncio.run(main(loop))
if __name__ == "__main__":
sys.exit(test_io_loop_in_thread()) |
|
Merged in the other PR. I've marked as ready for review and restarted CI. |
|
This is in. Thanks @graingert |
passing anything other than IOLoop.current() is already unsupported: see dask#6443 (comment)
refs #6163
Passing anything other than
IOLoop.current()will fail when you try to use the Scheduler anyway:self._lock will be bound to
IOLoop.current().asyncio_loopakaasyncio.get_running_loop():distributed/distributed/scheduler.py
Line 2923 in fc42dbd
and start_http_server runs on the IOLoop.current() also:
distributed/distributed/scheduler.py
Line 2963 in fc42dbd
pre-commit run --all-files