-
Notifications
You must be signed in to change notification settings - Fork 450
Failing to saturate Dask cluster after initial dispatch #1020
Description
I've noticed that my Dask cluster isn't being given enough tasks to saturate the cluster.
On master, I see something like the following
Setup
Details
from pprint import pprint
from time import time
import logging
from sklearn.datasets import fetch_20newsgroups
from sklearn.feature_extraction.text import HashingVectorizer
from sklearn.feature_extraction.text import TfidfTransformer
from sklearn.linear_model import SGDClassifier
from sklearn.model_selection import GridSearchCV
from sklearn.pipeline import Pipeline
# Scale Up: set categories=None to use all the categories
categories = [
'alt.atheism',
'talk.religion.misc',
]
print("Loading 20 newsgroups dataset for categories:")
print(categories)
data = fetch_20newsgroups(subset='train', categories=categories)
print("%d documents" % len(data.filenames))
print("%d categories" % len(data.target_names))
print()
pipeline = Pipeline([
('vect', HashingVectorizer()),
('tfidf', TfidfTransformer()),
('clf', SGDClassifier(max_iter=1000)),
])
import numpy as np
parameters = {
'tfidf__use_idf': (True, False),
'tfidf__norm': ('l1', 'l2'),
'clf__alpha': np.linspace(0.00001, 0.000001, 5),
'clf__penalty': ('l2', 'elasticnet'),
}
grid_search = GridSearchCV(pipeline, parameters, n_jobs=-1,
verbose=1, cv=5, refit=False,
iid=False, pre_dispatch="all")
import joblib
from distributed import performance_reporttraining:
with performance_report("master.html"):
with joblib.parallel_backend('dask') as (bend, _):
grid_search.fit(data.data, data.target)https://gistcdn.githack.com/TomAugspurger/bee36fce1d1bd50b922d1155423cca21/raw/be26141275d3b1fec5c8bdf0f7f3f8bd1a7c0a22/master.html (navigate to the "task stream" tab)
This one doesn't actually look that bad. The problem is more pronounced on actual clusters with more cores.
Poking around, I notice that joblib is (I think) submitting tasks one by one in Parallel.dispatch_one_batch. If make this change
diff --git a/joblib/parallel.py b/joblib/parallel.py
index 9a0b158a..f4e8bbea 100644
--- a/joblib/parallel.py
+++ b/joblib/parallel.py
@@ -818,9 +818,9 @@ class Parallel(Logger):
# the original iterator) -- decrease the batch size to
# account for potential variance in the batches running
# time.
- final_batch_size = max(1, len(islice) // (10 * n_jobs))
+ final_batch_size = max(8, len(islice) // (10 * n_jobs))
else:
- final_batch_size = max(1, len(islice) // n_jobs)
+ final_batch_size = max(8, len(islice) // n_jobs)
# enqueue n_jobs batches in a local queue
for i in range(0, len(islice), final_batch_size):then I see the following performance
Assuming my understanding of the situation is correct, which it probably isn't, is it possible for a backend to specify a minimum batch size? Does that make any sense?