Skip to content

Status update on Incremental and Grid Search #206

@TomAugspurger

Description

@TomAugspurger

Quick status update: I wanted to explore a workflow that would use scikit-learn
as much as possible. We'd use scikit-learn for all the hyper-parameter
optimization and the actual training. Dask would just provide the large
arrays.

The end goal is something as close as possible to GridSearchCV(SGDClassifier()) trained on a larger-than-memory dataset.

X, y = load_dask_arrays()

clf = sklearn.linear_model.SGDClassifier()
gs = sklearn.model_selection.GridSearchCV(clf, param_grid)
gs.fit(X, y)

First, we have to avoid passing a large Dask array to SGDClassifier.fit, as it
would be converted to an ndarray. So we wrap it with Incremental, which passes
blocks to the SGDClassifier.partial_fit:

import dask_ml.wrappers

X, y = load_dask_arrays()

clf = sklearn.linear_model.SGDClassifier()
inc = dsak_ml.wrappers.Incremental(clf)
gs = sklearn.model_selection.GridSearchCV(clf, param_grid)
gs.fit(X, y)

At this point, inc.fit(X, y) works fine (I'm currently debugging unexpectedly
high memory usage, but let's ignore that for now), but there's a subtle issue with
inc.score(X, y), which surfaces in GridSeachCV. By default, a pass-through
scorer is used, which is SGDClassifier.score. This ends up using
sklearn.metrics.accuracy_score, which end up converting the test Dask arrays
into large ndarrays on a single worker. This is captured in #200.

A workaround to the scoring issue is to manually pass a scorer that is able to
work well with Dask arrays. We've implemented a few in Dask-ML:

import dask_ml.wrappers
import dask_ml.metrics
from sklearn.metrics import make_scorer

X, y = load_dask_arrays()
scorer = make_scorer(dask_ml.metrics.accuracy_score)

clf  = sklearn.linear_model.SGDClassifier()
inc = dask_ml.wrappers.Incremental(clf, scoring=scorer)
gs = sklearn.model_selection.GridSearchCV(clf, param_grid)
gs.fit(X, y)

This gets us serial hyper-parameter optimization on larger-than-memory Dask
arrays. To do things in parallel, we can use the distributed backend.

import dask_ml.joblib
from sklearn.externals import joblib


with joblib.parallel_backend("dask"):
    gs.fit(X, y)

I'll post benchmarks later when I've run them.

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions