Skip to content

Fix excess threading on missing connections#2403

Merged
mrocklin merged 15 commits intodask:masterfrom
danpf:danpf/fix_thread_explosion
Jan 8, 2019
Merged

Fix excess threading on missing connections#2403
mrocklin merged 15 commits intodask:masterfrom
danpf:danpf/fix_thread_explosion

Conversation

@danpf
Copy link
Contributor

@danpf danpf commented Dec 7, 2018

When you start a worker, and there is no scheduler at the location you expect it to be, it continuously generates threads until it hits some limit (~300 for me) and then fails.
The problem is this while True loop.

we can solve this by making the future outside the loop, and then tracking that one instead of all of the ones from each while loop.

I - think - this is an okay way to do this, but maybe someone that knows tornado more can say otherwise.

possible fix for #2398

@mrocklin
Copy link
Member

mrocklin commented Dec 7, 2018

Interesting. First, thanks for taking the initiative and submitting this. I appreciate it.

I'm a little curious about that while loop. Under what conditions are we iterating many times? What is the EnvironmentError that you're getting that allows the loop to continue?

I am also a little surprised that calling connector.connect creates a new thread. This is all using Tornado, which should be doing everything concurrently within a single thread using non-blocking sockets.

@danpf
Copy link
Contributor Author

danpf commented Dec 7, 2018

I'm a little curious about that while loop. Under what conditions are we iterating many times? What is the EnvironmentError that you're getting that allows the loop to continue?

I'm not entirely sure, but if the ip exists, but there's no dask scheduler there it basically keeps checking until timeout error just in case it's busy responding to something else.

I am also a little surprised that calling connector.connect creates a new thread. This is all using Tornado, which should be doing everything concurrently within a single thread using non-blocking sockets.

unfortunately I have no idea. I have never used tornado before, but it looks like something to do with the way it does:

client =  TCPClient()
try:
    stream = yield client.connect(ip, port, max_buffer_size=...

@mrocklin
Copy link
Member

mrocklin commented Dec 7, 2018

OK, I printed out the error in the except EnvironmentError block, and saw a bunch of these:

<distributed.comm.tcp.TCPConnector object at 0x7ff27414a898>: ConnectionRefusedError: [Errno 111] Connection refused

Given the sleep that's there it looks like we're trying this every 10ms. I'm not surprised that this is somewhat challenging on the system, but I am surprised that it is manifesting as too many threads.

However, this also means that once that connection fails, we will want to start a new one, so I'm not sure that we can pull the connect call outside of the while loop. If you print out the future in that loop you'll find that it has failed. The while loop at that point becomes somewhat useless after that point. It just keeps waiting on a failed future.

Things still work for you because at some point this function stops entirely, and something above it tries it again.

@danpf
Copy link
Contributor Author

danpf commented Dec 13, 2018

@mrocklin
here's why it's making threads.

  File "/home/danpf/git/distributed/distributed/core.py", line 573, in send_recv_from_rpc      
    comm = yield self.live_comm()              
  File "/home/danpf/.local/share/pyenv/versions/3.7.0/lib/python3.7/site-packages/tornado/gen.py", line 326, in wrapper                                                                        
    yielded = next(result)                     
  File "/home/danpf/git/distributed/distributed/core.py", line 545, in live_comm               
    connection_args=self.connection_args)      
  File "/home/danpf/.local/share/pyenv/versions/3.7.0/lib/python3.7/site-packages/tornado/gen.py", line 326, in wrapper                                                                        
    yielded = next(result)                     
  File "/home/danpf/git/distributed/distributed/comm/core.py", line 182, in connect            
    **(connection_args or {}))                 
  File "/home/danpf/.local/share/pyenv/versions/3.7.0/lib/python3.7/site-packages/tornado/gen.py", line 326, in wrapper                                                                        
    yielded = next(result)                     
  File "/home/danpf/git/distributed/distributed/comm/tcp.py", line 331, in connect             
    **kwargs)                                  
  File "/home/danpf/.local/share/pyenv/versions/3.7.0/lib/python3.7/site-packages/tornado/gen.py", line 326, in wrapper                                                                        
    yielded = next(result)                     
  File "/home/danpf/.local/share/pyenv/versions/3.7.0/lib/python3.7/site-packages/tornado/tcpclient.py", line 226, in connect                                                                  
    addrinfo = yield self.resolver.resolve(host, port, af)                                     
  File "/home/danpf/.local/share/pyenv/versions/3.7.0/lib/python3.7/site-packages/tornado/gen.py", line 326, in wrapper                                                                        
    yielded = next(result)                     
  File "/home/danpf/.local/share/pyenv/versions/3.7.0/lib/python3.7/site-packages/tornado/netutil.py", line 378, in resolve                                                                    
    None, _resolve_addr, host, port, family)   

>        result = yield IOLoop.current().run_in_executor(
>            None, _resolve_addr, host, port, family)

  File "/home/danpf/.local/share/pyenv/versions/3.7.0/lib/python3.7/site-packages/tornado/platform/asyncio.py", line 166, in run_in_executor                                                   
    return self.asyncio_loop.run_in_executor(executor, func, *args)                            

  File "/home/danpf/.local/share/pyenv/versions/3.7.0/lib/python3.7/asyncio/base_events.py", line 737, in run_in_executor                                                                      
    executor.submit(func, *args), loop=self)   

>    def run_in_executor(self, executor, func, *args):
>        print("run in executor", executor)
>        self._check_closed()
>        if self._debug:
>            self._check_callback(func, 'run_in_executor')
>        if executor is None:
>            executor = self._default_executor
>            if executor is None:
>                executor = concurrent.futures.ThreadPoolExecutor()
>                self._default_executor = executor
>        return futures.wrap_future(
>            executor.submit(func, *args), loop=self)

  File "/home/danpf/.local/share/pyenv/versions/3.7.0/lib/python3.7/concurrent/futures/thread.py", line 160, in submit                                                                         
    self._adjust_thread_count()                
  File "/home/danpf/.local/share/pyenv/versions/3.7.0/lib/python3.7/concurrent/futures/thread.py", line 179, in _adjust_thread_count                                                           
    self._initargs))                           
  File "/home/danpf/.local/share/pyenv/versions/3.7.0/lib/python3.7/threading.py", line 784, in __init__                                                                                       
    traceback.print_stack()                    

The solution (i think) is to somehow hook distributed.comm.tcp.BaseTCPConnector.connect() up with an arg that gives the TCPClient init call your own (global, or worker/scheduler's )threadpoolexecutor that respects our desired number of threads. it will need its own submit function. (maybe it already has one?)

@mrocklin
Copy link
Member

Hrm, that's really interesting. Thank you for spending the effort to track that down.

It looks like that code should create and reuse a single ThreadPoolExecutor (it seems to be attaching it to the IOLoop for future reuse). I'm surprised that it seems to be leaking threads. One thing you could do here if you were interested is to verify that it continues to create executors, and if so why, and then raise that upstream to core CPython (if indeed you find that it is a bug).

My guess is that it will be difficult to pass a ThreadPoolExecutor down from Dask, through Tornado, to Asyncio (indeed it looks as though Tornado is hard-coding the executor argument to None).

You might also want to try Tornado master to see if they have resolved the problem upstream.

@danpf
Copy link
Contributor Author

danpf commented Dec 15, 2018

This new implementation __ might __ be a possible option. Some tests still fail though, so it is probably filling up the thread queue so much that it isn't able to get through all of them.

@mrocklin
Copy link
Member

Those errors might also be intermittent (there are unfortunately a few of these).

I don't know enough about tornado internals to be able to judge this PR effectively. Do you have any thoughts on how hard it would be to add a test for this behavior? Presumably it would call connect on an address that needed resolving, wait for a while, and then check that there were not significantly more threads than when we started.

@danpf
Copy link
Contributor Author

danpf commented Dec 20, 2018

convert_yielded appears to be the same as ensure_future, which basically means 'start this right asap'. but don't block on this right now, you just have to make sure you yield / await it later.

previously on an 8 core machine:

E       AssertionError: DONE 42
E       assert 42 == 3

now the test passes.

from the tornado mailing list:
https://groups.google.com/forum/#!topic/python-tornado/qfHTGOjIUuo

This now uses the asyncio "default" executor (for consistency with the rest of the asyncio ecosystem). You can customize this with either IOLoop.set_default_executor or its asyncio equivalent: http://www.tornadoweb.org/en/stable/ioloop.html#tornado.ioloop.IOLoop.set_default_executor

Note that the default executor has a high limit, but it does have a limit, so it won't keep spawning threads forever. Last time I checked it defaults to 5 threads per CPU. In general it makes sense to allow more than one thread to perform DNS lookups, although it's hard to say what the best limit is.

so in the future if this becomes a more complicated problem, it might be necessary to set the loop default executor to a specified number of threads, but for now allocating 2 threads to comms seems fine... I think 1 would also be fine, but just to be cautious....

Copy link
Member

@mrocklin mrocklin left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In principle this seems good to me. Thank you for constructing the test. It's very clear.

I've left a couple of small comments about the test, but they're relatively minor so we should be able to merge this soon.

future_connect = gen.convert_yielded(connect("tcp://localhost:28400", 0.6))
yield future_connect
max_thread_count = yield sleep_future
assert max_thread_count == 4
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I recommend testing that no more than a few threads were created since starting this test. Otherwise this test will fail if some other test leaks a thread (which does happen with a few of our tests).

return max_thread_count

# tcp.TCPConnector()
sleep_future = gen.convert_yielded(sleep_for_500ms())
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that you can drop the gen.convert_yielded here. Tornado semantics are a bit different from normal async-def semantics. gen.coroutine objects start running immediately once called.

This is also true below with the future_connect lines. I would merge them into a single yield connect(...) statement.

@mrocklin
Copy link
Member

mrocklin commented Jan 1, 2019

This looks good to me. Thank you for coming back to this @danpf . Two comments.

  1. The test failures appear to be unrelated, and do to an issue upstream. I'm xfailing them in XFail test with resources and collections #2445
  2. This test takes 1.5s on my machine, which is a bit long for us (we have thousands of tests and like to run them frequently). Is it possible to reduce the time of this test somehow while still making it sensitive to the failure? If not then we should mark it with the distributed.utils_test.slow decorator so that it doesn't slow down tests except when run with the --runslow flag.

@danpf
Copy link
Contributor Author

danpf commented Jan 1, 2019

@mrocklin
I reduced it to 0.2s on my computer.
we make an attempt (thread) every 10ms, so 5 attempts should be good enough to prove that it's working.

@mrocklin
Copy link
Member

mrocklin commented Jan 1, 2019

The Python 2 failure is interesting. I don't have a good idea of what is going on there unfortunately. Do you have any idea about what's going on?

@danpf
Copy link
Contributor Author

danpf commented Jan 2, 2019

I added one more thread to the tcpclient, and it fixed it (on my machine)

However, I have no idea why. It worries me that this is an incorrect fix that might have broader repercussions, but I'm not sure of alternatives.

on the other side, I've been using python3.7 asynchronous clients (with this patch (2 threads)) for a few weeks now with no problems...

@mrocklin
Copy link
Member

mrocklin commented Jan 2, 2019

I've been using python3.7 asynchronous clients (with this patch) for a few weeks now with no problems...

I'm glad to hear it.

It worries me that this is an incorrect fix that might have broader repercussions, but I'm not sure of alternatives.

I'm not particularly concerned about issues with Python 2. There are other known issues that we just don't fix because Python 2 doesn't have as high a priority. If the CI passes then I'm fine with it.

@danpf
Copy link
Contributor Author

danpf commented Jan 2, 2019

darn hrm I don't know what to do since it still fails... I haven't used python 2 in a while so i don't really know what's different...
image

@mrocklin
Copy link
Member

mrocklin commented Jan 2, 2019

So if I bump the thread count up to 100 then things run smoothly, so maybe there is some issue deep within an old version of Tornado.

My suggestion at this point would be to only assign a client to BaseTCPConnector in Python 3 and leave it as None otherwise. Then do something like client = BaseTCPConnector.client or TCPClient() in the connect coroutine. In Python 3 we'll do the right thing. In Python 2 we'll do previous behavior. This should solve your issue and be fixable in finite time.

Eventually when we drop Python 2 we can clear this out.

@danpf
Copy link
Contributor Author

danpf commented Jan 8, 2019

thank you for fixing my silly extra inclusion!

@mrocklin mrocklin merged commit 70c5129 into dask:master Jan 8, 2019
@mrocklin
Copy link
Member

mrocklin commented Jan 8, 2019

Merging.

Thanks for identifying and resolving this issue. I hope that you run into many more so that you can fix them ;)

Also, I notice that this is your first code contribution to this repository. Welcome!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants