-
-
Notifications
You must be signed in to change notification settings - Fork 757
Wait on single connection or try multiple connections #3084
Description
When we connect to a remote connection we currently wait on for our full timeout, often ten seconds, something like this:
comm = await connect(address, timeout="10s")However, @jacobtomlinson and I just ran into a situation with Kubernetes where the address that we were connecting to was created at just about the same time, so when we first tried to connect we were sent somewhere that would never receive the connection, but if we try again a second later, things are fine.
comm = await connect(address, timeout="10s") # this hangs for 10sfor i in range(10): # this connects after 1s
with ignoring(TimeoutError):
comm = await comm(address, timeout="1s")This seems to work because, presumably, after the first connection fails and we try reconnecting the network now routes us to the correct location.
In general this second approach seems more robust to networks that might be fiddled with on-the-fly, which is presumably more common in cloud and Kubernetes situations. However, it also means that we need to become better about cleaning up missed connections.
cc @jcrist @jacobtomlinson and @mmccarty
The actual code for this is here:
distributed/distributed/comm/core.py
Lines 205 to 228 in 549660e
| # This starts a thread | |
| while True: | |
| try: | |
| future = connector.connect( | |
| loc, deserialize=deserialize, **(connection_args or {}) | |
| ) | |
| comm = await gen.with_timeout( | |
| timedelta(seconds=deadline - time()), | |
| future, | |
| quiet_exceptions=EnvironmentError, | |
| ) | |
| except FatalCommClosedError: | |
| raise | |
| except EnvironmentError as e: | |
| error = str(e) | |
| if time() < deadline: | |
| await gen.sleep(0.01) | |
| logger.debug("sleeping on connect") | |
| else: | |
| _raise(error) | |
| except gen.TimeoutError: | |
| _raise(error) | |
| else: | |
| break |