ENH: avoid oversubscription with nested for loops#690
ENH: avoid oversubscription with nested for loops#690GaelVaroquaux wants to merge 17 commits intojoblib:mainfrom
Conversation
Nested for loops can request many many threads. This leads to oversubscription which leads to too much use of memory and possibly a fork bomb with threads (joblib#688). The solution I implemented involves two changes: - Sharing the thread pool across Parallel backends. This require scaling it when we add parallel instances - Falling back to sequential computing when too many Parallel backends are around.
Codecov Report
@@ Coverage Diff @@
## master #690 +/- ##
=========================================
- Coverage 95.01% 94.2% -0.81%
=========================================
Files 40 40
Lines 5694 5744 +50
=========================================
+ Hits 5410 5411 +1
- Misses 284 333 +49
Continue to review full report at Codecov.
|
ogrisel
left a comment
There was a problem hiding this comment.
As soon as travis is happy, I will be happy :)
joblib/_parallel_backends.py
Outdated
|
|
||
| else: | ||
| def cpu_count(): | ||
| return(1) |
There was a problem hiding this comment.
style: return 1 (return is still a statement in Python 3 ;)
joblib/_parallel_backends.py
Outdated
| return self._get_pool().apply_async( | ||
| out = self._get_pool().apply_async( | ||
| SafeFunction(func), callback=callback) | ||
| return(out) |
There was a problem hiding this comment.
Why introduce a local variable here?
joblib/_parallel_backends.py
Outdated
| # Don't span new threads if there are already many running | ||
| # This will fallback to SequentialBackend in the configure | ||
| # method | ||
| # This is necessary to avoid fork bombs |
joblib/_parallel_backends.py
Outdated
| if len(_thread_pool_users) > 2 * cpu_count(): | ||
| # Don't span new threads if there are already many running | ||
| # This will fallback to SequentialBackend in the configure | ||
| # method |
| global _thread_pool | ||
| _thread_pool = None | ||
| try: | ||
| _thread_pool_users.remove(self) |
There was a problem hiding this comment.
I think you are missing a global here?
Also, I don't understand why you terminate the ThreadPool if len(_thread_pool_user) > 1. In this case, you should just remove some thread from it no?
There was a problem hiding this comment.
I don't think the global declaration is required: we are just mutating the value of the _thread_pool_users variable, not assigning it a new value.
joblib/_parallel_backends.py
Outdated
| # The code below is accessing multiprocessing private API | ||
| max_processes = cpu_count() + len(_thread_pool_users) | ||
| if _thread_pool._processes < max_processes: | ||
| _thread_pool._processes = min(max_processes, |
There was a problem hiding this comment.
Pep8?
It feels hard to parse as the first argument is not aligned with the second.
| Gael Varoquaux | ||
|
|
||
| Avoid oversubscription when there are multiple nested parallel loops. | ||
| As a result the system avoids fork bombs with recursive parallel |
There was a problem hiding this comment.
Let's be more specific about this change:
the system avoids thread bombs with ....
This change is not about the "fork" system call or new process creation.
| from .externals.loky import process_executor, cpu_count | ||
|
|
||
| class SafeThreadPool(ThreadPool): | ||
| " A ThreadPool that can repopulate in a thread safe way." |
There was a problem hiding this comment.
Style:
class SafeThreadPool(ThreadPool):
"""A ThreadPool that can repopulate in a thread safe way."""
joblib/test/test_parallel.py
Outdated
|
|
||
| def test_fork_bomp(): | ||
| # Test that recursive parallelism raises a recursion rather than | ||
| # doing a fork bomp |
There was a problem hiding this comment.
doing a fork bomb nor a thread bomb.
joblib/test/test_parallel.py
Outdated
| # Depending on whether the exception is raised in the main thread | ||
| # or in a slave thread and the version of Python one exception org | ||
| # another is raised | ||
| with parallel_backend('threading', n_jobs=-1): |
There was a problem hiding this comment.
It would be even better to test with the default backend: loky would be used as the top level and threads in the nested calls.
|
I added a new stress test and it caused a deadlock under Windows as @tomMoral suggested earlier that it would. I am not sure we can fix the design to avoid this. I would rather not postpone the joblib release further for this. |
|
I would rather not postpone the joblib release further for this.
Can we at least put in place a system that falls back to sequential
backend when there is too much nesting. Right now it is easy to shoot
oneself in the foot.
|
Done in #700. |
ogrisel
left a comment
There was a problem hiding this comment.
We cannot merge this as is (because of the potential deadlocks).
We can probably find a way to better mitigate oversubscription issues but we should not delay the joblib release because of this as this is complex problem.
Nested for loops can request many many threads. This leads to oversubscription which leads to too much use of memory and possibly a fork bomb with threads (#688).
The solution I implemented involves two changes:
This PR fixes #688. It also avoids a very large memory consumption in the following code (use scikit-learn/scikit-learn#11166 to test in scikit-learn):
import os os.environ['SKLEARN_SITE_JOBLIB'] = '1' import joblib from sklearn import datasets, model_selection, ensemble data = datasets.fetch_covtype() X = data.data y = data.target rf = ensemble.RandomForestClassifier(n_estimators=100, n_jobs=-1, verbose=10, max_depth=1) model = model_selection.GridSearchCV(estimator=rf, param_grid=dict( max_features=[.1, .2, .3, .4, .5, .6]), n_jobs=-1, verbose=10, ) with joblib.parallel_backend('threading', n_jobs=-1): model_selection.cross_val_score(model, X, y, n_jobs=-1, verbose=10, )