-
-
Notifications
You must be signed in to change notification settings - Fork 1.9k
Description
I've run into an issue that may or may not be a bug. I'm using dask.array.map_blocks to execute a custom processing function. Dask is raising a TypeError in _execute_task and I've had quite a difficult time sorting out the reason for the error. @mrocklin was thinking it may be that I "have constructed a graph by hand and that that graph is not consistent. Some key is listed as a dependency but is not actually in the task graph".
My dask arrays are originating from within Xarray so I'm thinking part of the task tree is being constructed incorrectly there. In fact, I can only reproduce the error if pull my dask arrays from xarray objects (if I strip out the xarray layer, I can run my function without any problem). If true, I'm mostly asking for a more informative error message in this case.
The basic gist of what I'm trying to do is:
new = da.map_blocks(my_func, a, b, c, chunks=a.chunks, dtype=a.dtype).compute()where:
def my_func(a, b, c):
new = np.empty_like(a)
# does some things using b and c
return newI end up generating a task tree that looks like:
What I'm looking for here is a better way to tease out why dask is failing and how I (or Xarray) is creating an invalid task tree.
Version info
OS: Mac OS 10.12.4
Python
Xarray: 0.9.5
Dask: 0.14.1
Traceback
../../../../anaconda/envs/storylines/lib/python3.6/site-packages/xarray/core/dataarray.py:590: in compute
return new.load()
../../../../anaconda/envs/storylines/lib/python3.6/site-packages/xarray/core/dataarray.py:573: in load
ds = self._to_temp_dataset().load()
../../../../anaconda/envs/storylines/lib/python3.6/site-packages/xarray/core/dataset.py:469: in load
evaluated_data = da.compute(*lazy_data.values())
../../../../anaconda/envs/storylines/lib/python3.6/site-packages/dask/base.py:202: in compute
results = get(dsk, keys, **kwargs)
../../../../anaconda/envs/storylines/lib/python3.6/site-packages/dask/async.py:544: in get_sync
raise_on_exception=True, **kwargs)
../../../../anaconda/envs/storylines/lib/python3.6/site-packages/dask/async.py:487: in get_async
fire_task()
../../../../anaconda/envs/storylines/lib/python3.6/site-packages/dask/async.py:483: in fire_task
callback=queue.put)
../../../../anaconda/envs/storylines/lib/python3.6/site-packages/dask/async.py:532: in apply_sync
res = func(*args, **kwds)
../../../../anaconda/envs/storylines/lib/python3.6/site-packages/dask/async.py:266: in execute_task
result = _execute_task(task, data)
../../../../anaconda/envs/storylines/lib/python3.6/site-packages/dask/async.py:246: in _execute_task
args2 = [_execute_task(a, cache) for a in args]
../../../../anaconda/envs/storylines/lib/python3.6/site-packages/dask/async.py:246: in <listcomp>
args2 = [_execute_task(a, cache) for a in args]
../../../../anaconda/envs/storylines/lib/python3.6/site-packages/dask/async.py:246: in _execute_task
args2 = [_execute_task(a, cache) for a in args]
../../../../anaconda/envs/storylines/lib/python3.6/site-packages/dask/async.py:246: in <listcomp>
args2 = [_execute_task(a, cache) for a in args]
../../../../anaconda/envs/storylines/lib/python3.6/site-packages/dask/async.py:247: in _execute_task
return func(*args2)
../../../../anaconda/envs/storylines/lib/python3.6/site-packages/dask/array/core.py:81: in getarray_inline
return getarray(a, b, lock=lock)
../../../../anaconda/envs/storylines/lib/python3.6/site-packages/dask/array/core.py:64: in getarray
c = np.asarray(c)
../../../../anaconda/envs/storylines/lib/python3.6/site-packages/numpy/core/numeric.py:531: in asarray
return array(a, dtype, copy=False, order=order)
../../../../anaconda/envs/storylines/lib/python3.6/site-packages/xarray/core/common.py:94: in __array__
return np.asarray(self.values, dtype=dtype)
../../../../anaconda/envs/storylines/lib/python3.6/site-packages/xarray/core/dataarray.py:401: in values
return self.variable.values
../../../../anaconda/envs/storylines/lib/python3.6/site-packages/xarray/core/variable.py:308: in values
return _as_array_or_item(self._data)
../../../../anaconda/envs/storylines/lib/python3.6/site-packages/xarray/core/variable.py:184: in _as_array_or_item
data = np.asarray(data)
../../../../anaconda/envs/storylines/lib/python3.6/site-packages/numpy/core/numeric.py:531: in asarray
return array(a, dtype, copy=False, order=order)
../../../../anaconda/envs/storylines/lib/python3.6/site-packages/dask/array/core.py:1056: in __array__
x = self.compute()
../../../../anaconda/envs/storylines/lib/python3.6/site-packages/dask/base.py:95: in compute
(result,) = compute(self, traverse=False, **kwargs)
../../../../anaconda/envs/storylines/lib/python3.6/site-packages/dask/base.py:202: in compute
results = get(dsk, keys, **kwargs)
../../../../anaconda/envs/storylines/lib/python3.6/site-packages/dask/async.py:544: in get_sync
raise_on_exception=True, **kwargs)
../../../../anaconda/envs/storylines/lib/python3.6/site-packages/dask/async.py:487: in get_async
fire_task()
../../../../anaconda/envs/storylines/lib/python3.6/site-packages/dask/async.py:483: in fire_task
callback=queue.put)
../../../../anaconda/envs/storylines/lib/python3.6/site-packages/dask/async.py:532: in apply_sync
res = func(*args, **kwds)
../../../../anaconda/envs/storylines/lib/python3.6/site-packages/dask/async.py:266: in execute_task
result = _execute_task(task, data)
-----
arg = (<built-in function getitem>, ('data_to_match_array', 0, 0, 0), (0, slice(None, None, None), slice(None, None, None))), cache = {}, dsk = None
def _execute_task(arg, cache, dsk=None):
""" Do the actual work of collecting data and executing a function
Examples
--------
>>> cache = {'x': 1, 'y': 2}
Compute tasks against a cache
>>> _execute_task((add, 'x', 1), cache) # Compute task in naive manner
2
>>> _execute_task((add, (inc, 'x'), 1), cache) # Support nested computation
3
Also grab data from cache
>>> _execute_task('x', cache)
1
Support nested lists
>>> list(_execute_task(['x', 'y'], cache))
[1, 2]
>>> list(map(list, _execute_task([['x', 'y'], ['y', 'x']], cache)))
[[1, 2], [2, 1]]
>>> _execute_task('foo', cache) # Passes through on non-keys
'foo'
"""
if isinstance(arg, list):
return [_execute_task(a, cache) for a in arg]
elif istask(arg):
func, args = arg[0], arg[1:]
args2 = [_execute_task(a, cache) for a in args]
> return func(*args2)
E TypeError: tuple indices must be integers or slices, not tuple
../../../../anaconda/envs/storylines/lib/python3.6/site-packages/dask/async.py:247: TypeErrorcc @shoyer
