[MRG] Add a dask.distributed example#613
Conversation
|
For discussion and early review. Cc @TomAugspurger |
Codecov Report
@@ Coverage Diff @@
## master #613 +/- ##
========================================
- Coverage 95.29% 95.2% -0.1%
========================================
Files 39 39
Lines 5462 5462
========================================
- Hits 5205 5200 -5
- Misses 257 262 +5
Continue to review full report at Codecov.
|
| return i | ||
|
|
||
| with joblib.parallel_backend('dask.distributed', | ||
| scheduler_host=client.scheduler.address): |
There was a problem hiding this comment.
client.scheduler.address isn't always available, depending on how the type of cluster (It is available for the LocalCluster created automatically for Client(), not necessarily for others)
I believe the recommended way would be
address = client.scheduler_info()['address']
with joblib.parallel_backend('dask.distributed',
scheduler_host=address): There was a problem hiding this comment.
client.scheduler.address is always available, in this case scheduler is actually the connection object, not the scheduler itself. In either event though you don't need either.
Also, if relevant, I hope to release dask.distributed within a week.
|
|
||
| Realistic usage scenario: combining dask code with joblib code, for | ||
| instance using dask for preprocessing data, and scikit-learn for machine | ||
| learning. |
There was a problem hiding this comment.
Would you consider "prototyping a solution, to later be run on a truly distributed cluster" a "realistic usage scenario".
That (prototyping, before moving to a cluster) and the diagnostics dashboard are my two most common use cases for using the distributed scheduler on a single-machine.
| @@ -0,0 +1,57 @@ | |||
| """ | |||
| Using distributed for single_machine parallel computing | |||
There was a problem hiding this comment.
single_machine -> single machine
There was a problem hiding this comment.
FWIW we're trying to avoid referring the to code in the github.com/dask/distributed repository as distributed. The reason here is that it's a fairly generic term. Instead I might recommend just using the term dask here, or, if preferred, dask.distributed.
There was a problem hiding this comment.
I'll use dask.distributed.
This naming thing is quite confusing (I can't blame you, it's an error that I been into over and over, first with enthought.mayavi => mayavi, and later with scikits.learn => scikit-learn (imported as sklearn)). However, it will confuse the users, and even myself, as it makes the difference and the boundary between projects quite blurry.
| import distributed.joblib # noqa | ||
|
|
||
| ############################################################################### | ||
| # Run parallel computation using dask.distributed |
There was a problem hiding this comment.
we could add dask to intersphinx to link to their documentation?
There was a problem hiding this comment.
I would love to see this happen. I have no experience with intersphinx myself but have seen it in use more often more recently and have liked what I've seen.
| return i | ||
|
|
||
|
|
||
| with joblib.parallel_backend('dask.distributed', scheduler_host=address): |
There was a problem hiding this comment.
I would even add backend='dask.distributed' or add a small discussion after the title (it seems a bit empty there).
There was a problem hiding this comment.
This should suffice if you have already created a client (requires master). You also don't need the address = line above
with joblib.parallel_backend('dask'):There was a problem hiding this comment.
Now backend='dask' is very confusing, because, as @TomAugspurger was explaining to me, by default dask doesn't use the distributed backend, but a threading one.
I'll wait for a release of distributed to update this example, as I would like it to run with released version.
There was a problem hiding this comment.
Right, but you'll never use this with the legacy threaded scheduler, it isn't sufficiently flexible to handle Joblib's dynamism. There is only one relevant use case for Dask here, and it's the newer scheduler.
| source activate $CONDA_ENV_NAME | ||
|
|
||
| conda install --yes --quiet pip numpy sphinx matplotlib pillow | ||
| conda install --yes --quiet pip numpy sphinx matplotlib pillow dask distributed |
There was a problem hiding this comment.
Just dask suffices here. The core package is now called dask-core while dask is a metapackage that includes distributed and a few other packages (like numpy, pandas, ...)
| @@ -0,0 +1,57 @@ | |||
| """ | |||
| Using distributed for single_machine parallel computing | |||
There was a problem hiding this comment.
FWIW we're trying to avoid referring the to code in the github.com/dask/distributed repository as distributed. The reason here is that it's a fairly generic term. Instead I might recommend just using the term dask here, or, if preferred, dask.distributed.
| ############################################################################### | ||
| # Setup the distributed client | ||
| ############################################################################### | ||
| from distributed import Client |
There was a problem hiding this comment.
Similarly we tend to encourage from dask.distributed import Client in examples
There was a problem hiding this comment.
This is terribly confusing, you realize. I haven't looked at the codebases of the package, but my mental model right now is a bit lost as to what package does what.
There was a problem hiding this comment.
It's not that confusing if you're coming to it fresh (which most people are). It's really only a pain for the old hands who were around when we called the thing distributed on its own.
| return i | ||
|
|
||
|
|
||
| with joblib.parallel_backend('dask.distributed', scheduler_host=address): |
There was a problem hiding this comment.
This should suffice if you have already created a client (requires master). You also don't need the address = line above
with joblib.parallel_backend('dask'):| # Recover the address | ||
| address = client.scheduler_info()['address'] | ||
|
|
||
| # This import registers the dask backend for joblib |
There was a problem hiding this comment.
I think that that was correct: import is used as a noun here.
| joblib.Parallel(n_jobs=2, verbose=100)( | ||
| joblib.delayed(long_running_function)(i) | ||
| for i in range(10)) | ||
| # We can check that joblib is indeed using the dask.distributed |
|
I can't get the intersphinx mapping to work. Is distributed.Client documented in what is captured by intersphinx? |
|
Also, if relevant, I hope to release dask.distributed within a week.
Cool!
I think that I'd like to merge this example first, and modify it as soon
as you release dask.distributed.
I also hope that we'll release an alpha of the new joblib soon after.
That way, we'll all be in production soon.
|
|
Right, but you'll never use this with the legacy threaded scheduler, it isn't
sufficiently flexible to handle Joblib's dynamism. There is only one relevant
use case for Dask here, and it's the newer scheduler.
OK, and the threaded scheduler is going away? That would explain part of
my confusion.
|
No it's staying around. It's useful if you don't have tornado, are allergic to dependencies (it's stdlib only), or if you're doing relatively straightforward dask.array work. The newer scheduler is generally a more robustly a good choice though. |
|
I found the problem with intersphinx: Client is documented as
distributed.client.Client:
https://github.com/dask/distributed/blob/master/docs/source/api.rst
while we use it as distributed.Client.
I believe that the "currentmodule" should be changed in the file above.
|
|
Is there a way for us to flatten the namespace on our end? |
|
are allergic to dependencies (it's stdlib only),
Thanks for having my health in mind :).
|
|
Ha! |
|
Is there a way for us to flatten the namespace on our end?
In the API documentation file, just use as a "currentmodule" "dask": the
currentmodule will define what sphinx considers is the public path of the
objects.
|
|
|
||
|
|
||
| with joblib.parallel_backend('dask.distributed', scheduler_host=address): | ||
| joblib.Parallel(n_jobs=2, verbose=100)( |
There was a problem hiding this comment.
In current master, there is no need to put n_jobs=2 here anymore.
| for i in range(10)) | ||
| # Check that joblib is indeed using the dask.distributed | ||
| # backend | ||
| print(joblib.Parallel(n_jobs=1)._backend) |
There was a problem hiding this comment.
This is no longer necessary (on current master), I have added the active backend name in the verbose output of the call to Parallel.
fd6cfdb to
c9d6626
Compare
|
I've addressed all issues, and CI is green. Can I have merge? |
| ############################################################################### | ||
| # The verbose messages below show that the backend is indeed the | ||
| # dask.distributed one | ||
| with joblib.parallel_backend('dask.distributed', scheduler_host=address): |
There was a problem hiding this comment.
Adding scheduler_host=address is no longer strictly necessary. Dask will use the most recently created Client by default.
Leaving it in is ok too.
Hence it is visible on the example, which is better
e18d12a to
d110ba5
Compare
|
Merging this guy. |
* tag '0.12': (116 commits) Release 0.12 typo typo typo ENH add initializer limiting n_threads for C-libs (joblib#701) DOC better parallel docstring (joblib#704) [MRG] Nested parallel call thread bomb mitigation (joblib#700) MTN vendor loky2.1.3 (joblib#699) Make it possible to configure the reusable executor workers timeout (joblib#698) MAINT increase timeouts to make test more robust on travis DOC: use the .joblib extension instead of .pkl (joblib#697) [MRG] Fix exception handling in nested parallel calls (joblib#696) Fix skip test lz4 not installed (joblib#695) [MRG] numpy_pickle: several enhancements (joblib#626) Introduce Parallel.__call__ backend callbacks (joblib#689) Add distributed on readthedocs (joblib#686) Support registration of external backends (joblib#655) [MRG] Add a dask.distributed example (joblib#613) ENH use cloudpickle to pickle interactively defined callable (joblib#677) CI freeze the version of sklearn0.19.1 and scipy1.0.1 (joblib#685) ...
A simple dask.distributed example