Skip to content

Handle nested parallelism in distributed.joblib#1705

Merged
TomAugspurger merged 6 commits intodask:masterfrom
TomAugspurger:distributed-joblib-nested
Jan 25, 2018
Merged

Handle nested parallelism in distributed.joblib#1705
TomAugspurger merged 6 commits intodask:masterfrom
TomAugspurger:distributed-joblib-nested

Conversation

@TomAugspurger
Copy link
Copy Markdown
Member

This implements the new API introduced in
joblib/joblib#538 for to ensure nested parallelism works
correctly.

This breaks our API as previously DaskDistributedBackend had a client attribute
with the Client that was used. I believe (though I may be wrong) that the backend object gets serialized in nested calls, and clients are not serializable.

Nested parallelism will be workers creating a DaskDistributedBackend, with no
'scheduler_host'. To avoid deadlocks, worker threads will secede and rejoin.

cc @ogrisel.

This implements the new API introduced in
joblib/joblib#538 for to ensure nested parallelism works
correctly.

This breaks API as previously `DaskDistributedBackend` had a `client` attribute
with the `Client` that was used.

Nested parallelism will be workers creating a `DaskDistributedBackend`, with no
'scheduler_host'. To avoid deadlocks, worker threads will secede and rejoin.
@mrocklin
Copy link
Copy Markdown
Member

I wonder if maybe we should just use get_client() all the time. Perhaps it was a mistake to create a client within this wrapper. Perhaps we should expect the user to create a client externally and either pass that in or else depend on the get_client() global default. Thoughts?

try:
rejoin()
except AttributeError:
pass
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we rejoin here? Typically this is necessary if there is additional computational work to do in the task. That may not be the case here. It's harmless either way, but might add a tiny bit of unwanted delay.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wasn't sure about that either. To make sure I understand, when the worker thread secedes, the scheduler will tell the worker to make a new thread to take its place? In that case, yes we should be OK just not rejoining.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right, the thread leaves the thread pool, leaving it with n - 1 threads. We make a new thread to take it's place. Relevant code here: https://github.com/dask/distributed/blob/master/distributed/threadpoolexecutor.py#L94-L105

def secede(adjust=True):
    """ Have this thread secede from the ThreadPoolExecutor
    See Also
    --------
    rejoin: rejoin the thread pool
    """
    thread_state.proceed = False
    with threads_lock:
        thread_state.executor._threads.remove(threading.current_thread())
        if adjust:
            thread_state.executor._adjust_thread_count()

if joblib is None or LooseVersion(joblib.__version__) <= "0.11.0":
pytest.skip("Joblib >= 0.11.1 required.")
Parallel = joblib.Parallel
delayed = joblib.delayed
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Slight preference to leave these as fully namespaced, especially given the double use of delayed


with cluster() as (s, [a, b]):
with joblib.parallel_backend('dask.distributed', loop=loop,
scheduler_host=s['address']) as (ba, _):
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right, so about the "lets just always use the default client" argument from above, this would become the following:

with Client(s['address'], loop=loop) as client:
    with joblib.parallel_backend('dask') as (ba, _):
        ...

@TomAugspurger
Copy link
Copy Markdown
Member Author

Perhaps we should expect the user to create a client externally and either pass that in or else depend on the get_client() global default.

Ah, I was getting a bit confused earlier about when get_client was raising a ValueError. I was thinking the ValueError only came when you were on a worker, but it could also be when the user hasn't created a client. I'll think more about requiring an existing client. I'm 50/50 right now.

Maybe @jcrist has thoughts here too.

@mrocklin
Copy link
Copy Markdown
Member

Ah, I was getting a bit confused earlier about when get_client was raising a ValueError. I was thinking the ValueError only came when you were on a worker, but it could also be when the user hasn't created a client. I'll think more about requiring an existing client. I'm 50/50 right now.

I think that we can evolve into this API somewhat smoothly. I might suggest the following interface:

def __init__(self, scheduler_host=None, client=None, **kwargs):
    if client is None:
        if scheduler_host is not None:
            client = Client(scheduler_host, **kwargs)
        else:
            client = get_client()
    ...

This is now common convention.  It also simplifies nested calling.
@mrocklin
Copy link
Copy Markdown
Member

I've pushed my proposed changes to this fork/branch. Please feel free to accept or reject them as you like.

@mrocklin
Copy link
Copy Markdown
Member

+1 from me

Also it looks like we need some of these changes due to a new release of joblib?

@TomAugspurger
Copy link
Copy Markdown
Member Author

TomAugspurger commented Jan 24, 2018 via email

pass

yield
# No need to rejoin here
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why? Won't that create too many threads on the distributed worker nodes?

Also, is it valid to call secede() several times in the same thread? That would happen if there are several consecutive calls to Parallel() in a nested function. We should probably add a test for that case.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we're likely to do more work in this thread after completing then yes, we should rejoin.

@ogrisel I suspect that you're thinking of cases like the following?

results = Parallel()(...)

# do work

results2 = Parallel()(...)

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes

delayed = joblib.delayed

def get_nested_pids():
return Parallel(n_jobs=2)(delayed(os.getpid)() for _ in range(2))
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would be more interesting to have:

def get_nested_pids():
    pids = set(Parallel(n_jobs=2)(delayed(os.getpid)() for _ in range(2)))
    return pids.union(Parallel(n_jobs=2)(delayed(os.getpid)() for _ in range(2)))

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was a good test. It failed before and passes with the most recent commit.

Copy link
Copy Markdown
Contributor

@ogrisel ogrisel left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants