Skip to content

KeyError in distributed joblib #2058

@TomAugspurger

Description

@TomAugspurger

I haven't been able to reproduce the locally yet.

This is on distributed, dask, & dask-ml master, and the scikit-learn / joblib we used at the sprint.

I'm doing

%%time
with joblib.parallel_backend("dask"):
    gs.fit(X, y, classes=[0, 1])

with X and y being dask arrays (so can't pre-scatter).

The behavior I observe is

  1. Make the call, tasks show up in the dashboard
  2. A short time later, the tasks go black / gray, indicating they failed
  3. The notebook is hanging
  4. ctrl-C to interrupt (the keyboard interrupt you see in the exception below)
tornado.application - ERROR - Exception in callback functools.partial(<function wrap.<locals>.null_wrapper at 0x7fc37ad6fbf8>, <Future finished exception=AssertionError("yield from wasn't used with future",)>)
Traceback (most recent call last):
  File "/opt/conda/lib/python3.6/site-packages/distributed/joblib.py", line 204, in maybe_to_futures
    f = call_data_futures[arg]
  File "/opt/conda/lib/python3.6/site-packages/distributed/joblib.py", line 67, in __getitem__
    ref, val = self._data[id(obj)]
KeyError: 140477560921584

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/opt/conda/lib/python3.6/site-packages/tornado/ioloop.py", line 759, in _run_callback
    ret = callback()
  File "/opt/conda/lib/python3.6/site-packages/tornado/stack_context.py", line 276, in null_wrapper
    return fn(*args, **kwargs)
  File "/opt/conda/lib/python3.6/site-packages/tornado/ioloop.py", line 780, in _discard_future_result
    future.result()
  File "/opt/conda/lib/python3.6/site-packages/tornado/gen.py", line 1113, in run
    yielded = self.gen.send(value)
  File "/opt/conda/lib/python3.6/site-packages/distributed/joblib.py", line 244, in callback_wrapper
    callback(result)  # gets called in separate thread
  File "/home/jovyan/src/scikit-learn/sklearn/externals/joblib/parallel.py", line 326, in __call__
    self.parallel.dispatch_next()
  File "/home/jovyan/src/scikit-learn/sklearn/externals/joblib/parallel.py", line 746, in dispatch_next
    if not self.dispatch_one_batch(self._original_iterator):
  File "/home/jovyan/src/scikit-learn/sklearn/externals/joblib/parallel.py", line 774, in dispatch_one_batch
    self._dispatch(tasks)
  File "/home/jovyan/src/scikit-learn/sklearn/externals/joblib/parallel.py", line 731, in _dispatch
    job = self._backend.apply_async(batch, callback=cb)
  File "/opt/conda/lib/python3.6/site-packages/distributed/joblib.py", line 234, in apply_async
    func, args = self._to_func_args(func)
  File "/opt/conda/lib/python3.6/site-packages/distributed/joblib.py", line 224, in _to_func_args
    args = list(maybe_to_futures(args))
  File "/opt/conda/lib/python3.6/site-packages/distributed/joblib.py", line 212, in maybe_to_futures
    [f] = self.client.scatter([arg], broadcast=3)
AssertionError: yield from wasn't used with future

---------------------------------------------------------------------------
KeyError                                  Traceback (most recent call last)
/opt/conda/lib/python3.6/site-packages/distributed/joblib.py in maybe_to_futures(args)
    203                     try:
--> 204                         f = call_data_futures[arg]
    205                     except KeyError:

/opt/conda/lib/python3.6/site-packages/distributed/joblib.py in __getitem__(self, obj)
     66     def __getitem__(self, obj):
---> 67         ref, val = self._data[id(obj)]
     68         if ref() is not obj:

KeyError: 140477560921344

During handling of the above exception, another exception occurred:

KeyboardInterrupt                         Traceback (most recent call last)
<timed exec> in <module>()

~/src/scikit-learn/sklearn/model_selection/_search.py in fit(self, X, y, groups, **fit_params)
    658                                   error_score=self.error_score)
    659           for parameters, (train, test) in product(candidate_params,
--> 660                                                    cv.split(X, y, groups)))
    661 
    662         # if one choose to see train score, "out" will contain train score info

~/src/scikit-learn/sklearn/externals/joblib/parallel.py in __call__(self, iterable)
    943                 self._iterating = self._original_iterator is not None
    944 
--> 945             while self.dispatch_one_batch(iterator):
    946                 pass
    947 

~/src/scikit-learn/sklearn/externals/joblib/parallel.py in dispatch_one_batch(self, iterator)
    772                 return False
    773             else:
--> 774                 self._dispatch(tasks)
    775                 return True
    776 

~/src/scikit-learn/sklearn/externals/joblib/parallel.py in _dispatch(self, batch)
    729         with self._lock:
    730             job_idx = len(self._jobs)
--> 731             job = self._backend.apply_async(batch, callback=cb)
    732             # A job can complete so quickly than its callback is
    733             # called before we get here, causing self._jobs to

/opt/conda/lib/python3.6/site-packages/distributed/joblib.py in apply_async(self, func, callback)
    232     def apply_async(self, func, callback=None):
    233         key = '%s-batch-%s' % (joblib_funcname(func), uuid4().hex)
--> 234         func, args = self._to_func_args(func)
    235 
    236         future = self.client.submit(func, *args, key=key, **self.submit_kwargs)

/opt/conda/lib/python3.6/site-packages/distributed/joblib.py in _to_func_args(self, func)
    222         tasks = []
    223         for f, args, kwargs in func.items:
--> 224             args = list(maybe_to_futures(args))
    225             kwargs = dict(zip(kwargs.keys(), maybe_to_futures(kwargs.values())))
    226             tasks.append((f, args, kwargs))

/opt/conda/lib/python3.6/site-packages/distributed/joblib.py in maybe_to_futures(args)
    210                             # more workers need to reuse this data concurrently
    211                             # beyond the initial broadcast arity.
--> 212                             [f] = self.client.scatter([arg], broadcast=3)
    213                             call_data_futures[arg] = f
    214 

/opt/conda/lib/python3.6/site-packages/distributed/client.py in scatter(self, data, workers, broadcast, direct, hash, maxsize, timeout, asynchronous)
   1771                              broadcast=broadcast, direct=direct,
   1772                              local_worker=local_worker, timeout=timeout,
-> 1773                              asynchronous=asynchronous, hash=hash)
   1774 
   1775     @gen.coroutine

/opt/conda/lib/python3.6/site-packages/distributed/client.py in sync(self, func, *args, **kwargs)
    650             return future
    651         else:
--> 652             return sync(self.loop, func, *args, **kwargs)
    653 
    654     def __repr__(self):

/opt/conda/lib/python3.6/site-packages/distributed/utils.py in sync(loop, func, *args, **kwargs)
    271     else:
    272         while not e.is_set():
--> 273             e.wait(10)
    274     if error[0]:
    275         six.reraise(*error[0])

/opt/conda/lib/python3.6/threading.py in wait(self, timeout)
    549             signaled = self._flag
    550             if not signaled:
--> 551                 signaled = self._cond.wait(timeout)
    552             return signaled
    553 

/opt/conda/lib/python3.6/threading.py in wait(self, timeout)
    297             else:
    298                 if timeout > 0:
--> 299                     gotit = waiter.acquire(True, timeout)
    300                 else:
    301                     gotit = waiter.acquire(False)

KeyboardInterrupt: 

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions