Skip to content

Failing to saturate Dask cluster after initial dispatch #1020

@TomAugspurger

Description

@TomAugspurger

I've noticed that my Dask cluster isn't being given enough tasks to saturate the cluster.

On master, I see something like the following

Setup

Details
from pprint import pprint
from time import time
import logging

from sklearn.datasets import fetch_20newsgroups
from sklearn.feature_extraction.text import HashingVectorizer
from sklearn.feature_extraction.text import TfidfTransformer
from sklearn.linear_model import SGDClassifier
from sklearn.model_selection import GridSearchCV
from sklearn.pipeline import Pipeline

# Scale Up: set categories=None to use all the categories
categories = [
    'alt.atheism',
    'talk.religion.misc',
]

print("Loading 20 newsgroups dataset for categories:")
print(categories)

data = fetch_20newsgroups(subset='train', categories=categories)
print("%d documents" % len(data.filenames))
print("%d categories" % len(data.target_names))
print()

pipeline = Pipeline([
    ('vect', HashingVectorizer()),
    ('tfidf', TfidfTransformer()),
    ('clf', SGDClassifier(max_iter=1000)),
])

import numpy as np

parameters = {
    'tfidf__use_idf': (True, False),
    'tfidf__norm': ('l1', 'l2'),
    'clf__alpha': np.linspace(0.00001, 0.000001, 5),
    'clf__penalty': ('l2', 'elasticnet'),
}

grid_search = GridSearchCV(pipeline, parameters, n_jobs=-1,
                           verbose=1, cv=5, refit=False,
                           iid=False, pre_dispatch="all")

import joblib
from distributed import performance_report

training:

with performance_report("master.html"):
    with joblib.parallel_backend('dask') as (bend, _):
        grid_search.fit(data.data, data.target)

https://gistcdn.githack.com/TomAugspurger/bee36fce1d1bd50b922d1155423cca21/raw/be26141275d3b1fec5c8bdf0f7f3f8bd1a7c0a22/master.html (navigate to the "task stream" tab)

This one doesn't actually look that bad. The problem is more pronounced on actual clusters with more cores.

Poking around, I notice that joblib is (I think) submitting tasks one by one in Parallel.dispatch_one_batch. If make this change

diff --git a/joblib/parallel.py b/joblib/parallel.py
index 9a0b158a..f4e8bbea 100644
--- a/joblib/parallel.py
+++ b/joblib/parallel.py
@@ -818,9 +818,9 @@ class Parallel(Logger):
                     # the original iterator) -- decrease the batch size to
                     # account for potential variance in the batches running
                     # time.
-                    final_batch_size = max(1, len(islice) // (10 * n_jobs))
+                    final_batch_size = max(8, len(islice) // (10 * n_jobs))
                 else:
-                    final_batch_size = max(1, len(islice) // n_jobs)
+                    final_batch_size = max(8, len(islice) // n_jobs)
 
                 # enqueue n_jobs batches in a local queue
                 for i in range(0, len(islice), final_batch_size):

then I see the following performance

https://gistcdn.githack.com/TomAugspurger/e38002b460f7298f32213fa0a966846e/raw/755294cfcee65e1315bc63a6daa42148d4a8760b/patched.html

Assuming my understanding of the situation is correct, which it probably isn't, is it possible for a backend to specify a minimum batch size? Does that make any sense?

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions