Skip to content

remove remaining run_sync calls from tests#6196

Merged
mrocklin merged 1 commit intodask:mainfrom
graingert:remove-remaining-run-sync-calls
Apr 27, 2022
Merged

remove remaining run_sync calls from tests#6196
mrocklin merged 1 commit intodask:mainfrom
graingert:remove-remaining-run-sync-calls

Conversation

@graingert
Copy link
Member

@graingert graingert commented Apr 25, 2022

followup to #6170

This PR removes the tornado run_sync calls from the body of tests

@github-actions
Copy link
Contributor

github-actions bot commented Apr 25, 2022

Unit Test Results

       16 files  ±0         16 suites  ±0   7h 22m 59s ⏱️ - 23m 53s
  2 739 tests ±0    2 659 ✔️ +2       80 💤  - 1  0  - 1 
21 797 runs  ±0  20 753 ✔️ +7  1 044 💤  - 6  0  - 1 

Results for commit aa0c493. ± Comparison against base commit 84cbb09.

♻️ This comment has been updated with latest results.

@graingert graingert marked this pull request as ready for review April 25, 2022 19:12
@graingert graingert force-pushed the remove-remaining-run-sync-calls branch 2 times, most recently from 20fab69 to 0d8ffca Compare April 26, 2022 11:02
try:
res = thread_loop.run_sync(partial(func, *args, **kwargs), timeout=10)
except Exception:
main_loop.add_callback(fut.set_exc_info, sys.exc_info())
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the set_exc_info method is not available since tornado v5

@graingert graingert force-pushed the remove-remaining-run-sync-calls branch from 0d8ffca to 75fbbd2 Compare April 26, 2022 12:54
@graingert graingert force-pushed the remove-remaining-run-sync-calls branch from 75fbbd2 to aa0c493 Compare April 26, 2022 13:54
@graingert
Copy link
Member Author

=================================== FAILURES ===================================
______________________________ test_lots_of_tasks ______________________________

c = <Client: No scheduler connected>
s = <Scheduler 'tcp://127.0.0.1:52907', workers: 0, cores: 0, tasks: 0>
a = <Worker 'tcp://127.0.0.1:52908', name: 0, status: closed, stored: 0, running: 0/1, ready: 0, comm: 0, waiting: 0>
b = <Worker 'tcp://127.0.0.1:52910', name: 1, status: closed, stored: 0, running: 0/2, ready: 0, comm: 0, waiting: 0>

    @gen_cluster(
        client=True,
        scheduler_kwargs={"dashboard": True},
        config={
            "distributed.scheduler.dashboard.tasks.task-stream-length": 10,
            "distributed.scheduler.dashboard.status.task-stream-length": 10,
        },
    )
    async def test_lots_of_tasks(c, s, a, b):
        import tlz as toolz
    
        ts = TaskStream(s)
        ts.update()
        futures = c.map(toolz.identity, range(100))
        await wait(futures)
    
        tsp = s.plugins[TaskStreamPlugin.name]
        assert len(tsp.buffer) == 10
        ts.update()
>       assert len(ts.source.data["start"]) == 10
E       assert 11 == 10
E        +  where 11 = len(array([ 1.54638672,  0.        ,  0.99829102,  7.20776367,  5.12182617,\n        6.47998047, 83.5144043 , 49.33178711, 79.44384766, 82.61474609,\n       89.00756836]))

distributed/dashboard/tests/test_scheduler_bokeh.py:873: AssertionError
----------------------------- Captured stdout call -----------------------------
Dumped cluster state to test_cluster_dump/test_lots_of_tasks.yaml
----------------------------- Captured stderr call -----------------------------
2022-04-26 16:02:35,377 - distributed.worker - WARNING - Heartbeat to scheduler failed
Traceback (most recent call last):
  File "/Users/runner/work/distributed/distributed/distributed/comm/tcp.py", line 226, in read
    frames_nbytes = await stream.read_bytes(fmt_size)
tornado.iostream.StreamClosedError: Stream is closed

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "/Users/runner/work/distributed/distributed/distributed/worker.py", line 1150, in heartbeat
    response = await retry_operation(
  File "/Users/runner/work/distributed/distributed/distributed/utils_comm.py", line 381, in retry_operation
    return await retry(
  File "/Users/runner/work/distributed/distributed/distributed/utils_comm.py", line 366, in retry
    return await coro()
  File "/Users/runner/work/distributed/distributed/distributed/core.py", line 945, in send_recv_from_rpc
    return await send_recv(comm=comm, op=key, **kwargs)
  File "/Users/runner/work/distributed/distributed/distributed/core.py", line 709, in send_recv
    response = await comm.read(deserializers=deserializers)
  File "/Users/runner/work/distributed/distributed/distributed/comm/tcp.py", line 242, in read
    convert_stream_closed_error(self, e)
  File "/Users/runner/work/distributed/distributed/distributed/comm/tcp.py", line 150, in convert_stream_closed_error
    raise CommClosedError(f"in {obj}: {exc}") from exc
distributed.comm.core.CommClosedError: in <TCP (closed) ConnectionPool.heartbeat_worker local=tcp://127.0.0.1:52917 remote=tcp://127.0.0.1:52907>: Stream is closed
___________________ test_last_in_first_out[queue on worker] ____________________

c = <Client: No scheduler connected>
s = <Scheduler 'tcp://127.0.0.1:59925', workers: 0, cores: 0, tasks: 0>
a = <Worker 'tcp://127.0.0.1:59926', name: 0, status: closed, stored: 0, running: 1/1, ready: 2, comm: 0, waiting: 0>
pause = False

    @gen_blockable_cluster
    async def test_last_in_first_out(c, s, a, pause):
        async with block_worker(c, s, a, pause, 15, 5):
            xs = [c.submit(slowinc, i, delay=0.05, key=f"x{i}") for i in range(5)]
            ys = [c.submit(slowinc, xs[i], delay=0.05, key=f"y{i}") for i in range(5)]
            zs = [c.submit(slowinc, ys[i], delay=0.05, key=f"z{i}") for i in range(5)]
    
        while not any(s.tasks[z.key].state == "memory" for z in zs):
            await asyncio.sleep(0.01)
>       assert not all(s.tasks[x.key].state == "memory" for x in xs)
E       assert not True
E        +  where True = all(<generator object test_last_in_first_out.<locals>.<genexpr> at 0x13887e500>)

distributed/tests/test_priorities.py:233: AssertionError

test failures seem unrelated

@graingert graingert requested a review from fjetter April 27, 2022 08:25
@graingert
Copy link
Member Author

graingert commented Apr 27, 2022

issues for these new failures:

#6215
#6216

Copy link
Member

@sjperkins sjperkins left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This PR replaces the use of loop fixture and associated pristine_loop function with higher-level @gen_test decorators and cleanup fixtures. As I read the code, these higher-level constructs create pristine loops which are available for use by the asyncio interface, avoiding the need to call loop.run_sync in the test cases.

be used to determine if it exited correctly."""

async def parent_process_coroutine():
IOLoop.current()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I understand this is to ensure that the IOLoop has been created in this process? This then replaces the former pristine_loop test construct lower down and standard asyncio is used from that point on?

I think this is fine because the process is explicitly spawned as part of the test and won't have it's own IOLoop yet.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, AsyncProcess uses IOLoop.current(instance=False) which fails if a tornado loop hasn't been created.

Calling IOLoop.current() is only deprecated when there isn't a running asyncio loop



def test_All(loop):
@gen_test()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@gen_test should take care of providing a pristine IOLoop because it invokes test_All within a clean() context?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep



def test_retry_no_exception(loop):
def test_retry_no_exception(cleanup):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see the cleanup fixture also yields a pristine loop for sync test functions. It seems gen_test is the counterpart for async functions?

A nice to have might be to have gen_test_async and gen_test_sync decorators, but I realise I don't have a lot of context here. This shouldn't block the PR.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll be removing the call to pristine_loop in cleanup in my next PR

@sjperkins sjperkins self-requested a review April 27, 2022 10:48
@fjetter
Copy link
Member

fjetter commented Apr 27, 2022

I just re-triggered the failing job and we can merge if this turns out to be green

@mrocklin mrocklin merged commit b6a62f8 into dask:main Apr 27, 2022
@graingert graingert deleted the remove-remaining-run-sync-calls branch April 27, 2022 15:43
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants