-
-
Notifications
You must be signed in to change notification settings - Fork 757
handle distributed.core.Server startup and shutdown excellently #6616
Copy link
Copy link
Open
Labels
asynciodeprecationSomething is being removedSomething is being removedenhancementImprove existing functionality or make things work betterImprove existing functionality or make things work betterstabilityIssue or feature related to cluster stability (e.g. deadlock)Issue or feature related to cluster stability (e.g. deadlock)
Description
Server startup and shutdown is currently confusing and error prone see #6615 and a8244bd (#6603)
patterns evolving concurrent and re-entrant close and cancellation are prone to deadlocks:
try:
self.comm.read() # close call cancels this task and waits for this task to finish
finally:
await self.close() # this waits for close to finisheg
distributed/distributed/worker.py
Lines 1201 to 1210 in bc04d0e
| @fail_hard | |
| async def handle_scheduler(self, comm: Comm) -> None: | |
| await self.handle_stream(comm) | |
| logger.info( | |
| "Connection to scheduler broken. Closing without reporting. ID: %s Address %s Status: %s", | |
| self.id, | |
| self.address, | |
| self.status, | |
| ) | |
| await self.close() |
I think a pattern where only the task that calls async with Server(...): ... are allowed to call await self.finish() or await self.close()
a sketch here
class Server:
def __init__(self):
self.__close_done = asyncio.Event()
self.__start_event = asyncio.Event()
self.__close_event = asyncio.Event()
def request_close(self):
self.__start_event.set()
self.__close_event.set()
async def __lifecycle(self):
try:
await self.__start_event.wait()
async with self.listen(), self.open_rpc_pool():
try:
await self.start()
self.__start_event.set()
await self.__close_event.wait()
await self.close()
finally:
v = self.abort() # abort comms by calling socket.close()
assert v is None # abort must not be an async def
finally:
self.__close_done.set()
async def __aenter__(self):
self.__parent_task = asyncio.current_task()
self.__lifecyle_task = asyncio.create_task(self.__lifecyle())
await self.__start_event.wait()
def __await__(self):
warnings.warn("await Server() is deprecated, use async with Server()")
# ??? some background task magic here
async def __aexit__(self):
self.request_close()
await self.finished()
async def close(self):
try:
assert asyncio.current_task() is self.__lifecyle_task
# close comms, wait for tasks to cancel
finally:
v = self.abort() # abort comms by calling socket.close()
assert v is None
async def finished(self):
assert asyncio.current_task() is self.__parent_task
await self.__close_done.wait()Reactions are currently unavailable
Metadata
Metadata
Assignees
Labels
asynciodeprecationSomething is being removedSomething is being removedenhancementImprove existing functionality or make things work betterImprove existing functionality or make things work betterstabilityIssue or feature related to cluster stability (e.g. deadlock)Issue or feature related to cluster stability (e.g. deadlock)