Handle nested parallelism in distributed.joblib#1705
Handle nested parallelism in distributed.joblib#1705TomAugspurger merged 6 commits intodask:masterfrom
Conversation
This implements the new API introduced in joblib/joblib#538 for to ensure nested parallelism works correctly. This breaks API as previously `DaskDistributedBackend` had a `client` attribute with the `Client` that was used. Nested parallelism will be workers creating a `DaskDistributedBackend`, with no 'scheduler_host'. To avoid deadlocks, worker threads will secede and rejoin.
|
I wonder if maybe we should just use |
| try: | ||
| rejoin() | ||
| except AttributeError: | ||
| pass |
There was a problem hiding this comment.
Should we rejoin here? Typically this is necessary if there is additional computational work to do in the task. That may not be the case here. It's harmless either way, but might add a tiny bit of unwanted delay.
There was a problem hiding this comment.
I wasn't sure about that either. To make sure I understand, when the worker thread secedes, the scheduler will tell the worker to make a new thread to take its place? In that case, yes we should be OK just not rejoining.
There was a problem hiding this comment.
Right, the thread leaves the thread pool, leaving it with n - 1 threads. We make a new thread to take it's place. Relevant code here: https://github.com/dask/distributed/blob/master/distributed/threadpoolexecutor.py#L94-L105
def secede(adjust=True):
""" Have this thread secede from the ThreadPoolExecutor
See Also
--------
rejoin: rejoin the thread pool
"""
thread_state.proceed = False
with threads_lock:
thread_state.executor._threads.remove(threading.current_thread())
if adjust:
thread_state.executor._adjust_thread_count()| if joblib is None or LooseVersion(joblib.__version__) <= "0.11.0": | ||
| pytest.skip("Joblib >= 0.11.1 required.") | ||
| Parallel = joblib.Parallel | ||
| delayed = joblib.delayed |
There was a problem hiding this comment.
Slight preference to leave these as fully namespaced, especially given the double use of delayed
distributed/tests/test_joblib.py
Outdated
|
|
||
| with cluster() as (s, [a, b]): | ||
| with joblib.parallel_backend('dask.distributed', loop=loop, | ||
| scheduler_host=s['address']) as (ba, _): |
There was a problem hiding this comment.
Right, so about the "lets just always use the default client" argument from above, this would become the following:
with Client(s['address'], loop=loop) as client:
with joblib.parallel_backend('dask') as (ba, _):
...
Ah, I was getting a bit confused earlier about when Maybe @jcrist has thoughts here too. |
I think that we can evolve into this API somewhat smoothly. I might suggest the following interface: def __init__(self, scheduler_host=None, client=None, **kwargs):
if client is None:
if scheduler_host is not None:
client = Client(scheduler_host, **kwargs)
else:
client = get_client()
... |
This is now common convention. It also simplifies nested calling.
|
I've pushed my proposed changes to this fork/branch. Please feel free to accept or reject them as you like. |
|
+1 from me Also it looks like we need some of these changes due to a new release of joblib? |
|
The change at
https://github.com/dask/distributed/pull/1705/files#diff-bf3f88e462cfc84fc9199b072c6b4e6bR66
will be required for the new joblib.
Everything else changing in joblib should be backwards compatible.
…On Wed, Jan 24, 2018 at 8:41 PM, Matthew Rocklin ***@***.***> wrote:
+1 from me
Also it looks like we need some of these changes due to a new release of
joblib?
—
You are receiving this because you authored the thread.
Reply to this email directly, view it on GitHub
<#1705 (comment)>,
or mute the thread
<https://github.com/notifications/unsubscribe-auth/ABQHIkJlAAauWRFB8hFPQeaRe6LsTc2_ks5tN4eFgaJpZM4Rpmeh>
.
|
distributed/joblib.py
Outdated
| pass | ||
|
|
||
| yield | ||
| # No need to rejoin here |
There was a problem hiding this comment.
Why? Won't that create too many threads on the distributed worker nodes?
Also, is it valid to call secede() several times in the same thread? That would happen if there are several consecutive calls to Parallel() in a nested function. We should probably add a test for that case.
There was a problem hiding this comment.
If we're likely to do more work in this thread after completing then yes, we should rejoin.
@ogrisel I suspect that you're thinking of cases like the following?
results = Parallel()(...)
# do work
results2 = Parallel()(...)
distributed/tests/test_joblib.py
Outdated
| delayed = joblib.delayed | ||
|
|
||
| def get_nested_pids(): | ||
| return Parallel(n_jobs=2)(delayed(os.getpid)() for _ in range(2)) |
There was a problem hiding this comment.
It would be more interesting to have:
def get_nested_pids():
pids = set(Parallel(n_jobs=2)(delayed(os.getpid)() for _ in range(2)))
return pids.union(Parallel(n_jobs=2)(delayed(os.getpid)() for _ in range(2)))There was a problem hiding this comment.
This was a good test. It failed before and passes with the most recent commit.
This implements the new API introduced in
joblib/joblib#538 for to ensure nested parallelism works
correctly.
This breaks our API as previously
DaskDistributedBackendhad aclientattributewith the
Clientthat was used. I believe (though I may be wrong) that the backend object gets serialized in nested calls, and clients are not serializable.Nested parallelism will be workers creating a
DaskDistributedBackend, with no'scheduler_host'. To avoid deadlocks, worker threads will secede and rejoin.
cc @ogrisel.