-
-
Notifications
You must be signed in to change notification settings - Fork 757
Closed
Description
test_oversubscribing_leases had an unrelated failed over in #4240 (full traceback below). Interestingly it looks like the semaphore was in a state where the scheduler is None:
distributed.semaphore - ERROR - Release failed for client=Client-40a75d00-2a5c-11eb-90b8-000d3ae52994, lease_id=361b1423d6d4487c92589adde69d1e83, name=semaphore-c2426d70b6bf4f82afd634110f3950e2. Cluster network might be unstable?
Traceback (most recent call last):
File "D:\a\distributed\distributed\distributed\semaphore.py", line 472, in _release
self.client.scheduler.semaphore_release,
AttributeError: 'NoneType' object has no attribute 'semaphore_release'
Full traceback:
2020-11-19T11:48:45.9658320Z _________________________ test_oversubscribing_leases _________________________
2020-11-19T11:48:45.9658757Z
2020-11-19T11:48:45.9659124Z def test_func():
2020-11-19T11:48:45.9659556Z result = None
2020-11-19T11:48:45.9659939Z workers = []
2020-11-19T11:48:45.9660540Z with clean(timeout=active_rpc_timeout, **clean_kwargs) as loop:
2020-11-19T11:48:45.9661071Z
2020-11-19T11:48:45.9661465Z async def coro():
2020-11-19T11:48:45.9661957Z with dask.config.set(config):
2020-11-19T11:48:45.9662419Z s = False
2020-11-19T11:48:45.9662849Z for i in range(5):
2020-11-19T11:48:45.9663225Z try:
2020-11-19T11:48:45.9663690Z s, ws = await start_cluster(
2020-11-19T11:48:45.9664140Z nthreads,
2020-11-19T11:48:45.9664606Z scheduler,
2020-11-19T11:48:45.9665005Z loop,
2020-11-19T11:48:45.9665442Z security=security,
2020-11-19T11:48:45.9665954Z Worker=Worker,
2020-11-19T11:48:45.9666537Z scheduler_kwargs=scheduler_kwargs,
2020-11-19T11:48:45.9667166Z worker_kwargs=worker_kwargs,
2020-11-19T11:48:45.9667746Z )
2020-11-19T11:48:45.9668217Z except Exception as e:
2020-11-19T11:48:45.9668707Z logger.error(
2020-11-19T11:48:45.9669299Z "Failed to start gen_cluster, retrying",
2020-11-19T11:48:45.9669822Z exc_info=True,
2020-11-19T11:48:45.9670197Z )
2020-11-19T11:48:45.9670671Z await asyncio.sleep(1)
2020-11-19T11:48:45.9671113Z else:
2020-11-19T11:48:45.9671547Z workers[:] = ws
2020-11-19T11:48:45.9671977Z args = [s] + workers
2020-11-19T11:48:45.9672420Z break
2020-11-19T11:48:45.9672799Z if s is False:
2020-11-19T11:48:45.9673400Z raise Exception("Could not start cluster")
2020-11-19T11:48:45.9673909Z if client:
2020-11-19T11:48:45.9674314Z c = await Client(
2020-11-19T11:48:45.9674776Z s.address,
2020-11-19T11:48:45.9675180Z loop=loop,
2020-11-19T11:48:45.9675667Z security=security,
2020-11-19T11:48:45.9676152Z asynchronous=True,
2020-11-19T11:48:45.9676701Z **client_kwargs,
2020-11-19T11:48:45.9677080Z )
2020-11-19T11:48:45.9677486Z args = [c] + args
2020-11-19T11:48:45.9677849Z try:
2020-11-19T11:48:45.9678245Z future = func(*args)
2020-11-19T11:48:45.9678711Z if timeout:
2020-11-19T11:48:45.9679245Z future = asyncio.wait_for(future, timeout)
2020-11-19T11:48:45.9679844Z result = await future
2020-11-19T11:48:45.9680292Z if s.validate:
2020-11-19T11:48:45.9680779Z s.validate_state()
2020-11-19T11:48:45.9681196Z finally:
2020-11-19T11:48:45.9681699Z if client and c.status not in ("closing", "closed"):
2020-11-19T11:48:45.9682387Z await c._close(fast=s.status == Status.closed)
2020-11-19T11:48:45.9682982Z await end_cluster(s, workers)
2020-11-19T11:48:45.9683636Z await asyncio.wait_for(cleanup_global_workers(), 1)
2020-11-19T11:48:45.9684127Z
2020-11-19T11:48:45.9684484Z try:
2020-11-19T11:48:45.9684899Z c = await default_client()
2020-11-19T11:48:45.9685509Z except ValueError:
2020-11-19T11:48:45.9685955Z pass
2020-11-19T11:48:45.9686354Z else:
2020-11-19T11:48:45.9686826Z await c._close(fast=True)
2020-11-19T11:48:45.9687219Z
2020-11-19T11:48:45.9687635Z def get_unclosed():
2020-11-19T11:48:45.9688193Z return [c for c in Comm._instances if not c.closed()] + [
2020-11-19T11:48:45.9688735Z c
2020-11-19T11:48:45.9689204Z for c in _global_clients.values()
2020-11-19T11:48:45.9689730Z if c.status != "closed"
2020-11-19T11:48:45.9690164Z ]
2020-11-19T11:48:45.9690473Z
2020-11-19T11:48:45.9690833Z try:
2020-11-19T11:48:45.9691202Z start = time()
2020-11-19T11:48:45.9691668Z while time() < start + 5:
2020-11-19T11:48:45.9692107Z gc.collect()
2020-11-19T11:48:45.9692554Z if not get_unclosed():
2020-11-19T11:48:45.9693015Z break
2020-11-19T11:48:45.9693468Z await asyncio.sleep(0.05)
2020-11-19T11:48:45.9694037Z else:
2020-11-19T11:48:45.9694452Z if allow_unclosed:
2020-11-19T11:48:45.9695029Z print(f"Unclosed Comms: {get_unclosed()}")
2020-11-19T11:48:45.9695516Z else:
2020-11-19T11:48:45.9696111Z raise RuntimeError("Unclosed Comms", get_unclosed())
2020-11-19T11:48:45.9696702Z finally:
2020-11-19T11:48:45.9697170Z Comm._instances.clear()
2020-11-19T11:48:45.9697746Z _global_clients.clear()
2020-11-19T11:48:45.9698152Z
2020-11-19T11:48:45.9698545Z return result
2020-11-19T11:48:45.9698905Z
2020-11-19T11:48:45.9699326Z result = loop.run_sync(
2020-11-19T11:48:45.9699888Z > coro, timeout=timeout * 2 if timeout else timeout
2020-11-19T11:48:45.9700359Z )
2020-11-19T11:48:45.9700601Z
2020-11-19T11:48:45.9701056Z distributed\utils_test.py:954:
2020-11-19T11:48:45.9701515Z _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
2020-11-19T11:48:45.9701800Z
2020-11-19T11:48:45.9702985Z self = <tornado.platform.asyncio.AsyncIOLoop object at 0x0000029980440CC0>
2020-11-19T11:48:45.9704194Z func = <function gen_cluster.<locals>._.<locals>.test_func.<locals>.coro at 0x000002998532C158>
2020-11-19T11:48:45.9704901Z timeout = 20
2020-11-19T11:48:45.9705278Z
2020-11-19T11:48:45.9705821Z def run_sync(self, func, timeout=None):
2020-11-19T11:48:45.9707836Z """Starts the `IOLoop`, runs the given function, and stops the loop.
2020-11-19T11:48:45.9708537Z
2020-11-19T11:48:45.9709224Z The function must return either an awaitable object or
2020-11-19T11:48:45.9710034Z ``None``. If the function returns an awaitable object, the
2020-11-19T11:48:45.9710866Z `IOLoop` will run until the awaitable is resolved (and
2020-11-19T11:48:45.9711668Z `run_sync()` will return the awaitable's result). If it raises
2020-11-19T11:48:45.9712544Z an exception, the `IOLoop` will stop and the exception will be
2020-11-19T11:48:45.9713274Z re-raised to the caller.
2020-11-19T11:48:45.9713825Z
2020-11-19T11:48:45.9714440Z The keyword-only argument ``timeout`` may be used to set
2020-11-19T11:48:45.9715324Z a maximum duration for the function. If the timeout expires,
2020-11-19T11:48:45.9716193Z a `tornado.util.TimeoutError` is raised.
2020-11-19T11:48:45.9716959Z
2020-11-19T11:48:45.9717573Z This method is useful to allow asynchronous calls in a
2020-11-19T11:48:45.9718338Z ``main()`` function::
2020-11-19T11:48:45.9719337Z
2020-11-19T11:48:45.9719975Z async def main():
2020-11-19T11:48:45.9720492Z # do stuff...
2020-11-19T11:48:45.9720987Z
2020-11-19T11:48:45.9721450Z if __name__ == '__main__':
2020-11-19T11:48:45.9722108Z IOLoop.current().run_sync(main)
2020-11-19T11:48:45.9722647Z
2020-11-19T11:48:45.9723160Z .. versionchanged:: 4.3
2020-11-19T11:48:45.9723948Z Returning a non-``None``, non-awaitable value is now an error.
2020-11-19T11:48:45.9724582Z
2020-11-19T11:48:45.9725183Z .. versionchanged:: 5.0
2020-11-19T11:48:45.9725923Z If a timeout occurs, the ``func`` coroutine will be cancelled.
2020-11-19T11:48:45.9726901Z
2020-11-19T11:48:45.9727322Z """
2020-11-19T11:48:45.9727859Z future_cell = [None]
2020-11-19T11:48:45.9728330Z
2020-11-19T11:48:45.9728874Z def run():
2020-11-19T11:48:45.9729353Z try:
2020-11-19T11:48:45.9729888Z result = func()
2020-11-19T11:48:45.9730428Z if result is not None:
2020-11-19T11:48:45.9731133Z from tornado.gen import convert_yielded
2020-11-19T11:48:45.9731840Z result = convert_yielded(result)
2020-11-19T11:48:45.9732608Z except Exception:
2020-11-19T11:48:45.9733245Z future_cell[0] = Future()
2020-11-19T11:48:45.9733902Z future_set_exc_info(future_cell[0], sys.exc_info())
2020-11-19T11:48:45.9734548Z else:
2020-11-19T11:48:45.9735103Z if is_future(result):
2020-11-19T11:48:45.9735712Z future_cell[0] = result
2020-11-19T11:48:45.9736238Z else:
2020-11-19T11:48:45.9736892Z future_cell[0] = Future()
2020-11-19T11:48:45.9737508Z future_cell[0].set_result(result)
2020-11-19T11:48:45.9738297Z self.add_future(future_cell[0], lambda future: self.stop())
2020-11-19T11:48:45.9739000Z self.add_callback(run)
2020-11-19T11:48:45.9739616Z if timeout is not None:
2020-11-19T11:48:45.9740185Z def timeout_callback():
2020-11-19T11:48:45.9740906Z # If we can cancel the future, do so and wait on it. If not,
2020-11-19T11:48:45.9741701Z # Just stop the loop and return with the task still pending.
2020-11-19T11:48:45.9742532Z # (If we neither cancel nor wait for the task, a warning
2020-11-19T11:48:45.9743175Z # will be logged).
2020-11-19T11:48:45.9744221Z if not future_cell[0].cancel():
2020-11-19T11:48:45.9744933Z self.stop()
2020-11-19T11:48:45.9745795Z timeout_handle = self.add_timeout(self.time() + timeout, timeout_callback)
2020-11-19T11:48:45.9746672Z self.start()
2020-11-19T11:48:45.9747274Z if timeout is not None:
2020-11-19T11:48:45.9747918Z self.remove_timeout(timeout_handle)
2020-11-19T11:48:45.9748716Z if future_cell[0].cancelled() or not future_cell[0].done():
2020-11-19T11:48:45.9749571Z > raise TimeoutError('Operation timed out after %s seconds' % timeout)
2020-11-19T11:48:45.9750623Z E tornado.util.TimeoutError: Operation timed out after 20 seconds
2020-11-19T11:48:45.9751309Z
2020-11-19T11:48:45.9752143Z C:\Miniconda3\envs\dask-distributed\lib\site-packages\tornado\ioloop.py:575: TimeoutError
2020-11-19T11:48:45.9753252Z ---------------------------- Captured stderr call -----------------------------
2020-11-19T11:48:45.9754116Z distributed.worker - WARNING - Compute Failed
2020-11-19T11:48:45.9754865Z Function: execute_task
2020-11-19T11:48:45.9756244Z args: ((<function apply at 0x00000299B570BA60>, <function test_oversubscribing_leases.<locals>.guaranteed_lease_timeout at 0x0000029985879730>, (<class 'tuple'>, [0]), {'sem': <distributed.semaphore.Semaphore object at 0x0000029984C24860>}))
2020-11-19T11:48:45.9760612Z kwargs: {}
2020-11-19T11:48:45.9761499Z Exception: CommClosedError('in <closed TCP>: Stream is closed',)
2020-11-19T11:48:45.9762164Z
2020-11-19T11:48:45.9762866Z distributed.worker - WARNING - Compute Failed
2020-11-19T11:48:45.9763590Z Function: observe_state
2020-11-19T11:48:45.9764164Z args: ()
2020-11-19T11:48:45.9765013Z kwargs: {'sem': <distributed.semaphore.Semaphore object at 0x0000029984F630F0>}
2020-11-19T11:48:45.9766668Z Exception: AttributeError("'NoneType' object has no attribute 'semaphore_value'",)
2020-11-19T11:48:45.9767413Z
2020-11-19T11:48:45.9768048Z distributed.utils - ERROR -
2020-11-19T11:48:45.9768707Z Traceback (most recent call last):
2020-11-19T11:48:45.9769594Z File "D:\a\distributed\distributed\distributed\utils.py", line 655, in log_errors
2020-11-19T11:48:45.9770328Z yield
2020-11-19T11:48:45.9771148Z File "D:\a\distributed\distributed\distributed\worker.py", line 1210, in close
2020-11-19T11:48:45.9771954Z await self.rpc.close()
2020-11-19T11:48:45.9772927Z File "D:\a\distributed\distributed\distributed\core.py", line 1107, in close
2020-11-19T11:48:45.9773828Z *[comm.close() for comm in comms], return_exceptions=True
2020-11-19T11:48:45.9774729Z concurrent.futures._base.CancelledError
2020-11-19T11:48:45.9777360Z distributed.semaphore - ERROR - Release failed for client=Client-40a75d00-2a5c-11eb-90b8-000d3ae52994, lease_id=361b1423d6d4487c92589adde69d1e83, name=semaphore-c2426d70b6bf4f82afd634110f3950e2. Cluster network might be unstable?
2020-11-19T11:48:45.9780071Z Traceback (most recent call last):
2020-11-19T11:48:45.9780850Z File "D:\a\distributed\distributed\distributed\semaphore.py", line 472, in _release
2020-11-19T11:48:45.9781841Z self.client.scheduler.semaphore_release,
2020-11-19T11:48:45.9939281Z AttributeError: 'NoneType' object has no attribute 'semaphore_release'
2020-11-19T11:48:45.9940682Z ------------------------------ Captured log call ------------------------------
2020-11-19T11:48:45.9941500Z ERROR asyncio:base_events.py:1296 Task was destroyed but it is pending!
2020-11-19T11:48:45.9943619Z task: <Task pending coro=<Worker.execute() running at D:\a\distributed\distributed\distributed\worker.py:2545> wait_for=<Future finished result={'actual-exception': AttributeErro...t_metadata'",), 'exception': <Serialize: '...get_metadata'>, 'op': 'task-erred', 'start': 1605786127.8403795, ...}> cb=[IOLoop.add_future.<locals>.<lambda>() at C:\Miniconda3\envs\dask-distributed\lib\site-packages\tornado\ioloop.py:719]>
cc @fjetter for visibility
Reactions are currently unavailable
Metadata
Metadata
Assignees
Labels
No labels