Skip to content

Joblib scatter#1022

Merged
jcrist merged 4 commits intodask:masterfrom
jcrist:joblib-scatter
Apr 27, 2017
Merged

Joblib scatter#1022
jcrist merged 4 commits intodask:masterfrom
jcrist:joblib-scatter

Conversation

@jcrist
Copy link
Copy Markdown
Member

@jcrist jcrist commented Apr 17, 2017

Allows to prescatter data to the cluster. For large arguments (big arrays, etc...) that are used in more than one task, this can be more efficient as it avoids serializing the data for every task.

Also fix to ensure distributed backend is an instance of the ParallelBackendBase for both scikit-learn and joblib modules.

@jcrist
Copy link
Copy Markdown
Member Author

jcrist commented Apr 17, 2017

My benchmarks show this helps, but only a little bit. Have yet to find a scikit-learn task where the distributed backend is noticeably more performant than the existing threading/multiprocessing backends (except for grid-search, which is still beat by dask-searchcv). This PR helps though, but only for code where the serialization time is large compared to compute time.

One issue is that internal scikit-learn code may transform the input data, so the objects that make it to joblib aren't the same as the input ones (e.g. the y that reaches the joblib step isn't the same as was input to fit). This seems to be mainly true for y, and less so for X (which is usually larger). I can't think of a simple workaround for this, and am not convinced that the possible speedup is worth the code complication.

@mrocklin
Copy link
Copy Markdown
Member

Are the y's different objects or are they changing in place?

@mrocklin
Copy link
Copy Markdown
Member

I wouldn't be surprised if joblib was near full speed on a single machine. This might have more of an impact on a cluster where communication costs are larger.

@jcrist
Copy link
Copy Markdown
Member Author

jcrist commented Apr 18, 2017

Are the y's different objects or are they changing in place?

Different objects. Usually it's a small thing like a reshape or dtype change.

@mrocklin
Copy link
Copy Markdown
Member

What is the status here? Is this code helpful?

@jcrist
Copy link
Copy Markdown
Member Author

jcrist commented Apr 26, 2017

It's useful, but still fails to beat single machine performance for anything except grid_search (which is still beat by dask_searchcv). It does improve performance though. Fix merge conflicts and merge?

@mrocklin
Copy link
Copy Markdown
Member

It's useful, but still fails to beat single machine performance for anything except grid_search (which is still beat by dask_searchcv). It does improve performance though. Fix merge conflicts and merge?

Do you mean to say that if we have a cluster we should still prefer to use a single machine or that on a single machine one should use the threaded/multiprocessing joblib backends? If the former then did you get a sense for what the limiting factor was?

@jcrist
Copy link
Copy Markdown
Member Author

jcrist commented Apr 26, 2017

The latter. The former depends on the work:serialization-cost ratio, and how many tasks there are. Grid-Search is a good use here, as fitting individual estimators is usually pretty expensive, and we're using the same data multiple times (which can be cached).

@mrocklin
Copy link
Copy Markdown
Member

Are there joblib-accelerated algorithms within sklearn that have a high work-to-serialization-cost ratio other than grid search? cc @ogrisel

@jcrist
Copy link
Copy Markdown
Member Author

jcrist commented Apr 26, 2017

Ok, I ran a benchmark of fitting using a modified version of this scikit-learn benchmark (see here for the code). This fits a RandomForestClassifier and a ExtraTreesClassifier on the same data using both the threading and dask.distributed backends.

The threading backend was run on a single m4.2xlarge instance with 8 cores. The distributed was run with three of these as workers, for a total of 24 cores. Here are the results:

ubuntu@ip-172-31-43-228:~$ python bench.py
Loading dataset...
Creating train-test split...
Dataset statistics:
===================
number of features:       54
number of classes:        2
data type:                float32
number of train samples:  522911 (pos=332178, neg=190733, size=112MB)
number of test samples:   58101 (pos=36994, neg=21107, size=12MB)

Training Classifiers
====================
Training ExtraTreesClassifier with threading backend... done
Training ExtraTreesClassifier with dask.distributed backend... done
Training RandomForest with threading backend... done
Training RandomForest with dask.distributed backend... done

Classification performance:
===========================
Classifier   train-time test-time error-rate
--------------------------------------------
RandomForest, threading 35.2176s 0.5228s 0.0296
RandomForest, dask.distributed 11.4347s 3.0020s 0.0296
ExtraTreesClassifier, threading 33.4432s 0.7229s 0.0325
ExtraTreesClassifier, dask.distributed 16.7369s 6.5222s 0.0325

As you can see here, train time for RandomForestClassifier is roughly 1/3 when run distributed (which makes sense). ExtraTreesClassifier is roughly 1/2. Note that the predict time is slower though, which also makes sense as predict is a pretty fast operation.

I tried running without the scatter option and ran into some memory errors in pickle - not sure the cause.

As such, I think this is a net win. I have a patch for scikit-learn that I haven't pushed yet to allow overriding the backend for fitting on these classes, which would avoid the need for the monkeypatch in the benchmark above.

(cc @amueller, who recommended this benchmark a few months ago :))

@mrocklin
Copy link
Copy Markdown
Member

To my naive understanding this seems pretty great?

Hijacking joblib like this seems like the least intrusive way to accelerate scikit-learn with dask.distributed. It would be interesting to see both how well this scales and also what other common computations in scikit-learn might benefit.

also cc'ing @GaelVaroquaux

For the perspective of this PR I think it's clear that it provides an improvement. Working towards merging sounds good to me.

@amueller
Copy link
Copy Markdown

That looks pretty good indeed. Maybe get some error bars? It looks like the speedup for RF is slightly more than 3x which is a bit strange ;)
But I believe that you can get nearly 3x. I'm a bit surprised by the difference between RF and ET (though error bars again?).
I haven't looked at the code but it sounds like you're monkey patching? Surely joblib could add the proper interface?

@jcrist
Copy link
Copy Markdown
Member Author

jcrist commented Apr 26, 2017

To my naive understanding this seems pretty great?

Yeah, not sure why previous numbers were less great. Might have grabbed the wrong commit when running them.

MIN_IDEAL_BATCH_DURATION = 0.2
MAX_IDEAL_BATCH_DURATION = 1.0
MIN_IDEAL_BATCH_DURATION = 0.5
MAX_IDEAL_BATCH_DURATION = 5.0
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How did you come by these values?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oop, leftover from debugging.

self.client = Client(scheduler_host, loop=loop)
if scatter is not None:
# Keep a reference to the scattered data to keep the ids the same
self._scatter = list(scatter)
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What happens if someone gives us a single numpy array?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Then it's bad. We should only accept lists here because technically we can pre-scatter any object so it'd be hard to know if someone wanted us to scatter the object or elements inside it. Will fix to error if not list/tuple.

return Batch(tasks), args2

def apply_async(self, func, callback=None):
key = '%s-%s' % (joblib_funcname(func), uuid4().hex)
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this add "%s-batch-%s" or something to signify that this is more than one function call?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure.

@jcrist
Copy link
Copy Markdown
Member Author

jcrist commented Apr 26, 2017

I haven't looked at the code but it sounds like you're monkey patching? Surely joblib could add the proper interface?

Was just monkey-patching in the benchmark. Scikit-learn hardcodes the backend into the Parallel calls for these classes - I have a patch for scikit-learn that checks for a global backend setting first, and if unset then uses the locally provided default. Looks like:

Parallel(n_jobs=self.n_jobs, verbose=self.verbose,
         backend=get_joblib_backend(default="threading"))(...)

There might be a better way though.

jcrist added 4 commits April 26, 2017 17:47
Allows to prescatter data to the cluster. For large arguments (big
arrays, etc...) that are used in more than one task, this can be more
efficient as it avoids serializing the data for every task.

Also fix to ensure distributed backend is an instance of the
`ParallelBackendBase` for both scikit-learn and joblib modules.
@jcrist
Copy link
Copy Markdown
Member Author

jcrist commented Apr 27, 2017

I think this should be good to go.

sols = [func(*args, **kwargs) for func, args, kwargs in tasks]
results = Parallel()(tasks)

ba.terminate()
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is this call necessary? Should this be called as part of __exit__?

Copy link
Copy Markdown
Member Author

@jcrist jcrist Apr 27, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Backends aren't contextmanagers, there is no __exit__ to the backend. The call was already there for the other tests (e.g. https://github.com/dask/distributed/blob/master/distributed/tests/test_joblib.py#L42), I moved it to use terminate which is more standard to the other joblib backends.

return funcname(x)


class Batch(object):
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will users ever see this object? If so should it get a one-line docstring?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, this is completely internal.

@mrocklin
Copy link
Copy Markdown
Member

+1 from me

@jcrist jcrist merged commit 8140a74 into dask:master Apr 27, 2017
@jcrist jcrist deleted the joblib-scatter branch April 27, 2017 00:53
@GaelVaroquaux
Copy link
Copy Markdown

GaelVaroquaux commented Apr 27, 2017 via email

@amueller
Copy link
Copy Markdown

@jcrist can you raise a PR with the patch to sklearn?

@jcrist
Copy link
Copy Markdown
Member Author

jcrist commented May 17, 2017

I could, but wanted to wait until a decision on how best to override scikit-learn estimators was reached (see relevant issue: scikit-learn/scikit-learn#8804).

@jcrist
Copy link
Copy Markdown
Member Author

jcrist commented May 23, 2017

@jcrist can you raise a PR with the patch to sklearn?

See simple patch to scikit-learn here or more complicated but probably better proposal to joblib here.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants