Skip to content

Flaky test_oversubscribing_leases #4256

@jrbourbeau

Description

@jrbourbeau

test_oversubscribing_leases had an unrelated failed over in #4240 (full traceback below). Interestingly it looks like the semaphore was in a state where the scheduler is None:

distributed.semaphore - ERROR - Release failed for client=Client-40a75d00-2a5c-11eb-90b8-000d3ae52994, lease_id=361b1423d6d4487c92589adde69d1e83, name=semaphore-c2426d70b6bf4f82afd634110f3950e2. Cluster network might be unstable?
Traceback (most recent call last):
  File "D:\a\distributed\distributed\distributed\semaphore.py", line 472, in _release
    self.client.scheduler.semaphore_release,
AttributeError: 'NoneType' object has no attribute 'semaphore_release'
Full traceback:
2020-11-19T11:48:45.9658320Z _________________________ test_oversubscribing_leases _________________________
2020-11-19T11:48:45.9658757Z 
2020-11-19T11:48:45.9659124Z     def test_func():
2020-11-19T11:48:45.9659556Z         result = None
2020-11-19T11:48:45.9659939Z         workers = []
2020-11-19T11:48:45.9660540Z         with clean(timeout=active_rpc_timeout, **clean_kwargs) as loop:
2020-11-19T11:48:45.9661071Z     
2020-11-19T11:48:45.9661465Z             async def coro():
2020-11-19T11:48:45.9661957Z                 with dask.config.set(config):
2020-11-19T11:48:45.9662419Z                     s = False
2020-11-19T11:48:45.9662849Z                     for i in range(5):
2020-11-19T11:48:45.9663225Z                         try:
2020-11-19T11:48:45.9663690Z                             s, ws = await start_cluster(
2020-11-19T11:48:45.9664140Z                                 nthreads,
2020-11-19T11:48:45.9664606Z                                 scheduler,
2020-11-19T11:48:45.9665005Z                                 loop,
2020-11-19T11:48:45.9665442Z                                 security=security,
2020-11-19T11:48:45.9665954Z                                 Worker=Worker,
2020-11-19T11:48:45.9666537Z                                 scheduler_kwargs=scheduler_kwargs,
2020-11-19T11:48:45.9667166Z                                 worker_kwargs=worker_kwargs,
2020-11-19T11:48:45.9667746Z                             )
2020-11-19T11:48:45.9668217Z                         except Exception as e:
2020-11-19T11:48:45.9668707Z                             logger.error(
2020-11-19T11:48:45.9669299Z                                 "Failed to start gen_cluster, retrying",
2020-11-19T11:48:45.9669822Z                                 exc_info=True,
2020-11-19T11:48:45.9670197Z                             )
2020-11-19T11:48:45.9670671Z                             await asyncio.sleep(1)
2020-11-19T11:48:45.9671113Z                         else:
2020-11-19T11:48:45.9671547Z                             workers[:] = ws
2020-11-19T11:48:45.9671977Z                             args = [s] + workers
2020-11-19T11:48:45.9672420Z                             break
2020-11-19T11:48:45.9672799Z                     if s is False:
2020-11-19T11:48:45.9673400Z                         raise Exception("Could not start cluster")
2020-11-19T11:48:45.9673909Z                     if client:
2020-11-19T11:48:45.9674314Z                         c = await Client(
2020-11-19T11:48:45.9674776Z                             s.address,
2020-11-19T11:48:45.9675180Z                             loop=loop,
2020-11-19T11:48:45.9675667Z                             security=security,
2020-11-19T11:48:45.9676152Z                             asynchronous=True,
2020-11-19T11:48:45.9676701Z                             **client_kwargs,
2020-11-19T11:48:45.9677080Z                         )
2020-11-19T11:48:45.9677486Z                         args = [c] + args
2020-11-19T11:48:45.9677849Z                     try:
2020-11-19T11:48:45.9678245Z                         future = func(*args)
2020-11-19T11:48:45.9678711Z                         if timeout:
2020-11-19T11:48:45.9679245Z                             future = asyncio.wait_for(future, timeout)
2020-11-19T11:48:45.9679844Z                         result = await future
2020-11-19T11:48:45.9680292Z                         if s.validate:
2020-11-19T11:48:45.9680779Z                             s.validate_state()
2020-11-19T11:48:45.9681196Z                     finally:
2020-11-19T11:48:45.9681699Z                         if client and c.status not in ("closing", "closed"):
2020-11-19T11:48:45.9682387Z                             await c._close(fast=s.status == Status.closed)
2020-11-19T11:48:45.9682982Z                         await end_cluster(s, workers)
2020-11-19T11:48:45.9683636Z                         await asyncio.wait_for(cleanup_global_workers(), 1)
2020-11-19T11:48:45.9684127Z     
2020-11-19T11:48:45.9684484Z                     try:
2020-11-19T11:48:45.9684899Z                         c = await default_client()
2020-11-19T11:48:45.9685509Z                     except ValueError:
2020-11-19T11:48:45.9685955Z                         pass
2020-11-19T11:48:45.9686354Z                     else:
2020-11-19T11:48:45.9686826Z                         await c._close(fast=True)
2020-11-19T11:48:45.9687219Z     
2020-11-19T11:48:45.9687635Z                     def get_unclosed():
2020-11-19T11:48:45.9688193Z                         return [c for c in Comm._instances if not c.closed()] + [
2020-11-19T11:48:45.9688735Z                             c
2020-11-19T11:48:45.9689204Z                             for c in _global_clients.values()
2020-11-19T11:48:45.9689730Z                             if c.status != "closed"
2020-11-19T11:48:45.9690164Z                         ]
2020-11-19T11:48:45.9690473Z     
2020-11-19T11:48:45.9690833Z                     try:
2020-11-19T11:48:45.9691202Z                         start = time()
2020-11-19T11:48:45.9691668Z                         while time() < start + 5:
2020-11-19T11:48:45.9692107Z                             gc.collect()
2020-11-19T11:48:45.9692554Z                             if not get_unclosed():
2020-11-19T11:48:45.9693015Z                                 break
2020-11-19T11:48:45.9693468Z                             await asyncio.sleep(0.05)
2020-11-19T11:48:45.9694037Z                         else:
2020-11-19T11:48:45.9694452Z                             if allow_unclosed:
2020-11-19T11:48:45.9695029Z                                 print(f"Unclosed Comms: {get_unclosed()}")
2020-11-19T11:48:45.9695516Z                             else:
2020-11-19T11:48:45.9696111Z                                 raise RuntimeError("Unclosed Comms", get_unclosed())
2020-11-19T11:48:45.9696702Z                     finally:
2020-11-19T11:48:45.9697170Z                         Comm._instances.clear()
2020-11-19T11:48:45.9697746Z                         _global_clients.clear()
2020-11-19T11:48:45.9698152Z     
2020-11-19T11:48:45.9698545Z                     return result
2020-11-19T11:48:45.9698905Z     
2020-11-19T11:48:45.9699326Z             result = loop.run_sync(
2020-11-19T11:48:45.9699888Z >               coro, timeout=timeout * 2 if timeout else timeout
2020-11-19T11:48:45.9700359Z             )
2020-11-19T11:48:45.9700601Z 
2020-11-19T11:48:45.9701056Z distributed\utils_test.py:954: 
2020-11-19T11:48:45.9701515Z _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
2020-11-19T11:48:45.9701800Z 
2020-11-19T11:48:45.9702985Z self = <tornado.platform.asyncio.AsyncIOLoop object at 0x0000029980440CC0>
2020-11-19T11:48:45.9704194Z func = <function gen_cluster.<locals>._.<locals>.test_func.<locals>.coro at 0x000002998532C158>
2020-11-19T11:48:45.9704901Z timeout = 20
2020-11-19T11:48:45.9705278Z 
2020-11-19T11:48:45.9705821Z     def run_sync(self, func, timeout=None):
2020-11-19T11:48:45.9707836Z         """Starts the `IOLoop`, runs the given function, and stops the loop.
2020-11-19T11:48:45.9708537Z     
2020-11-19T11:48:45.9709224Z         The function must return either an awaitable object or
2020-11-19T11:48:45.9710034Z         ``None``. If the function returns an awaitable object, the
2020-11-19T11:48:45.9710866Z         `IOLoop` will run until the awaitable is resolved (and
2020-11-19T11:48:45.9711668Z         `run_sync()` will return the awaitable's result). If it raises
2020-11-19T11:48:45.9712544Z         an exception, the `IOLoop` will stop and the exception will be
2020-11-19T11:48:45.9713274Z         re-raised to the caller.
2020-11-19T11:48:45.9713825Z     
2020-11-19T11:48:45.9714440Z         The keyword-only argument ``timeout`` may be used to set
2020-11-19T11:48:45.9715324Z         a maximum duration for the function.  If the timeout expires,
2020-11-19T11:48:45.9716193Z         a `tornado.util.TimeoutError` is raised.
2020-11-19T11:48:45.9716959Z     
2020-11-19T11:48:45.9717573Z         This method is useful to allow asynchronous calls in a
2020-11-19T11:48:45.9718338Z         ``main()`` function::
2020-11-19T11:48:45.9719337Z     
2020-11-19T11:48:45.9719975Z             async def main():
2020-11-19T11:48:45.9720492Z                 # do stuff...
2020-11-19T11:48:45.9720987Z     
2020-11-19T11:48:45.9721450Z             if __name__ == '__main__':
2020-11-19T11:48:45.9722108Z                 IOLoop.current().run_sync(main)
2020-11-19T11:48:45.9722647Z     
2020-11-19T11:48:45.9723160Z         .. versionchanged:: 4.3
2020-11-19T11:48:45.9723948Z            Returning a non-``None``, non-awaitable value is now an error.
2020-11-19T11:48:45.9724582Z     
2020-11-19T11:48:45.9725183Z         .. versionchanged:: 5.0
2020-11-19T11:48:45.9725923Z            If a timeout occurs, the ``func`` coroutine will be cancelled.
2020-11-19T11:48:45.9726901Z     
2020-11-19T11:48:45.9727322Z         """
2020-11-19T11:48:45.9727859Z         future_cell = [None]
2020-11-19T11:48:45.9728330Z     
2020-11-19T11:48:45.9728874Z         def run():
2020-11-19T11:48:45.9729353Z             try:
2020-11-19T11:48:45.9729888Z                 result = func()
2020-11-19T11:48:45.9730428Z                 if result is not None:
2020-11-19T11:48:45.9731133Z                     from tornado.gen import convert_yielded
2020-11-19T11:48:45.9731840Z                     result = convert_yielded(result)
2020-11-19T11:48:45.9732608Z             except Exception:
2020-11-19T11:48:45.9733245Z                 future_cell[0] = Future()
2020-11-19T11:48:45.9733902Z                 future_set_exc_info(future_cell[0], sys.exc_info())
2020-11-19T11:48:45.9734548Z             else:
2020-11-19T11:48:45.9735103Z                 if is_future(result):
2020-11-19T11:48:45.9735712Z                     future_cell[0] = result
2020-11-19T11:48:45.9736238Z                 else:
2020-11-19T11:48:45.9736892Z                     future_cell[0] = Future()
2020-11-19T11:48:45.9737508Z                     future_cell[0].set_result(result)
2020-11-19T11:48:45.9738297Z             self.add_future(future_cell[0], lambda future: self.stop())
2020-11-19T11:48:45.9739000Z         self.add_callback(run)
2020-11-19T11:48:45.9739616Z         if timeout is not None:
2020-11-19T11:48:45.9740185Z             def timeout_callback():
2020-11-19T11:48:45.9740906Z                 # If we can cancel the future, do so and wait on it. If not,
2020-11-19T11:48:45.9741701Z                 # Just stop the loop and return with the task still pending.
2020-11-19T11:48:45.9742532Z                 # (If we neither cancel nor wait for the task, a warning
2020-11-19T11:48:45.9743175Z                 # will be logged).
2020-11-19T11:48:45.9744221Z                 if not future_cell[0].cancel():
2020-11-19T11:48:45.9744933Z                     self.stop()
2020-11-19T11:48:45.9745795Z             timeout_handle = self.add_timeout(self.time() + timeout, timeout_callback)
2020-11-19T11:48:45.9746672Z         self.start()
2020-11-19T11:48:45.9747274Z         if timeout is not None:
2020-11-19T11:48:45.9747918Z             self.remove_timeout(timeout_handle)
2020-11-19T11:48:45.9748716Z         if future_cell[0].cancelled() or not future_cell[0].done():
2020-11-19T11:48:45.9749571Z >           raise TimeoutError('Operation timed out after %s seconds' % timeout)
2020-11-19T11:48:45.9750623Z E           tornado.util.TimeoutError: Operation timed out after 20 seconds
2020-11-19T11:48:45.9751309Z 
2020-11-19T11:48:45.9752143Z C:\Miniconda3\envs\dask-distributed\lib\site-packages\tornado\ioloop.py:575: TimeoutError
2020-11-19T11:48:45.9753252Z ---------------------------- Captured stderr call -----------------------------
2020-11-19T11:48:45.9754116Z distributed.worker - WARNING -  Compute Failed
2020-11-19T11:48:45.9754865Z Function:  execute_task
2020-11-19T11:48:45.9756244Z args:      ((<function apply at 0x00000299B570BA60>, <function test_oversubscribing_leases.<locals>.guaranteed_lease_timeout at 0x0000029985879730>, (<class 'tuple'>, [0]), {'sem': <distributed.semaphore.Semaphore object at 0x0000029984C24860>}))
2020-11-19T11:48:45.9760612Z kwargs:    {}
2020-11-19T11:48:45.9761499Z Exception: CommClosedError('in <closed TCP>: Stream is closed',)
2020-11-19T11:48:45.9762164Z 
2020-11-19T11:48:45.9762866Z distributed.worker - WARNING -  Compute Failed
2020-11-19T11:48:45.9763590Z Function:  observe_state
2020-11-19T11:48:45.9764164Z args:      ()
2020-11-19T11:48:45.9765013Z kwargs:    {'sem': <distributed.semaphore.Semaphore object at 0x0000029984F630F0>}
2020-11-19T11:48:45.9766668Z Exception: AttributeError("'NoneType' object has no attribute 'semaphore_value'",)
2020-11-19T11:48:45.9767413Z 
2020-11-19T11:48:45.9768048Z distributed.utils - ERROR - 
2020-11-19T11:48:45.9768707Z Traceback (most recent call last):
2020-11-19T11:48:45.9769594Z   File "D:\a\distributed\distributed\distributed\utils.py", line 655, in log_errors
2020-11-19T11:48:45.9770328Z     yield
2020-11-19T11:48:45.9771148Z   File "D:\a\distributed\distributed\distributed\worker.py", line 1210, in close
2020-11-19T11:48:45.9771954Z     await self.rpc.close()
2020-11-19T11:48:45.9772927Z   File "D:\a\distributed\distributed\distributed\core.py", line 1107, in close
2020-11-19T11:48:45.9773828Z     *[comm.close() for comm in comms], return_exceptions=True
2020-11-19T11:48:45.9774729Z concurrent.futures._base.CancelledError
2020-11-19T11:48:45.9777360Z distributed.semaphore - ERROR - Release failed for client=Client-40a75d00-2a5c-11eb-90b8-000d3ae52994, lease_id=361b1423d6d4487c92589adde69d1e83, name=semaphore-c2426d70b6bf4f82afd634110f3950e2. Cluster network might be unstable?
2020-11-19T11:48:45.9780071Z Traceback (most recent call last):
2020-11-19T11:48:45.9780850Z   File "D:\a\distributed\distributed\distributed\semaphore.py", line 472, in _release
2020-11-19T11:48:45.9781841Z     self.client.scheduler.semaphore_release,
2020-11-19T11:48:45.9939281Z AttributeError: 'NoneType' object has no attribute 'semaphore_release'
2020-11-19T11:48:45.9940682Z ------------------------------ Captured log call ------------------------------
2020-11-19T11:48:45.9941500Z ERROR    asyncio:base_events.py:1296 Task was destroyed but it is pending!
2020-11-19T11:48:45.9943619Z task: <Task pending coro=<Worker.execute() running at D:\a\distributed\distributed\distributed\worker.py:2545> wait_for=<Future finished result={'actual-exception': AttributeErro...t_metadata'",), 'exception': <Serialize: '...get_metadata'>, 'op': 'task-erred', 'start': 1605786127.8403795, ...}> cb=[IOLoop.add_future.<locals>.<lambda>() at C:\Miniconda3\envs\dask-distributed\lib\site-packages\tornado\ioloop.py:719]>

cc @fjetter for visibility

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions