Skip to content

TypeError raised in _execute_task #2298

@jhamman

Description

@jhamman

I've run into an issue that may or may not be a bug. I'm using dask.array.map_blocks to execute a custom processing function. Dask is raising a TypeError in _execute_task and I've had quite a difficult time sorting out the reason for the error. @mrocklin was thinking it may be that I "have constructed a graph by hand and that that graph is not consistent. Some key is listed as a dependency but is not actually in the task graph".

My dask arrays are originating from within Xarray so I'm thinking part of the task tree is being constructed incorrectly there. In fact, I can only reproduce the error if pull my dask arrays from xarray objects (if I strip out the xarray layer, I can run my function without any problem). If true, I'm mostly asking for a more informative error message in this case.

The basic gist of what I'm trying to do is:

new = da.map_blocks(my_func, a, b, c, chunks=a.chunks, dtype=a.dtype).compute()

where:

def my_func(a, b, c):
    new = np.empty_like(a)
    # does some things using b and c
    return new

I end up generating a task tree that looks like:

mydask

What I'm looking for here is a better way to tease out why dask is failing and how I (or Xarray) is creating an invalid task tree.

Version info
OS: Mac OS 10.12.4
Python
Xarray: 0.9.5
Dask: 0.14.1

Traceback

../../../../anaconda/envs/storylines/lib/python3.6/site-packages/xarray/core/dataarray.py:590: in compute
    return new.load()
../../../../anaconda/envs/storylines/lib/python3.6/site-packages/xarray/core/dataarray.py:573: in load
    ds = self._to_temp_dataset().load()
../../../../anaconda/envs/storylines/lib/python3.6/site-packages/xarray/core/dataset.py:469: in load
    evaluated_data = da.compute(*lazy_data.values())
../../../../anaconda/envs/storylines/lib/python3.6/site-packages/dask/base.py:202: in compute
    results = get(dsk, keys, **kwargs)
../../../../anaconda/envs/storylines/lib/python3.6/site-packages/dask/async.py:544: in get_sync
    raise_on_exception=True, **kwargs)
../../../../anaconda/envs/storylines/lib/python3.6/site-packages/dask/async.py:487: in get_async
    fire_task()
../../../../anaconda/envs/storylines/lib/python3.6/site-packages/dask/async.py:483: in fire_task
    callback=queue.put)
../../../../anaconda/envs/storylines/lib/python3.6/site-packages/dask/async.py:532: in apply_sync
    res = func(*args, **kwds)
../../../../anaconda/envs/storylines/lib/python3.6/site-packages/dask/async.py:266: in execute_task
    result = _execute_task(task, data)
../../../../anaconda/envs/storylines/lib/python3.6/site-packages/dask/async.py:246: in _execute_task
    args2 = [_execute_task(a, cache) for a in args]
../../../../anaconda/envs/storylines/lib/python3.6/site-packages/dask/async.py:246: in <listcomp>
    args2 = [_execute_task(a, cache) for a in args]
../../../../anaconda/envs/storylines/lib/python3.6/site-packages/dask/async.py:246: in _execute_task
    args2 = [_execute_task(a, cache) for a in args]
../../../../anaconda/envs/storylines/lib/python3.6/site-packages/dask/async.py:246: in <listcomp>
    args2 = [_execute_task(a, cache) for a in args]
../../../../anaconda/envs/storylines/lib/python3.6/site-packages/dask/async.py:247: in _execute_task
    return func(*args2)
../../../../anaconda/envs/storylines/lib/python3.6/site-packages/dask/array/core.py:81: in getarray_inline
    return getarray(a, b, lock=lock)
../../../../anaconda/envs/storylines/lib/python3.6/site-packages/dask/array/core.py:64: in getarray
    c = np.asarray(c)
../../../../anaconda/envs/storylines/lib/python3.6/site-packages/numpy/core/numeric.py:531: in asarray
    return array(a, dtype, copy=False, order=order)
../../../../anaconda/envs/storylines/lib/python3.6/site-packages/xarray/core/common.py:94: in __array__
    return np.asarray(self.values, dtype=dtype)
../../../../anaconda/envs/storylines/lib/python3.6/site-packages/xarray/core/dataarray.py:401: in values
    return self.variable.values
../../../../anaconda/envs/storylines/lib/python3.6/site-packages/xarray/core/variable.py:308: in values
    return _as_array_or_item(self._data)
../../../../anaconda/envs/storylines/lib/python3.6/site-packages/xarray/core/variable.py:184: in _as_array_or_item
    data = np.asarray(data)
../../../../anaconda/envs/storylines/lib/python3.6/site-packages/numpy/core/numeric.py:531: in asarray
    return array(a, dtype, copy=False, order=order)
../../../../anaconda/envs/storylines/lib/python3.6/site-packages/dask/array/core.py:1056: in __array__
    x = self.compute()
../../../../anaconda/envs/storylines/lib/python3.6/site-packages/dask/base.py:95: in compute
    (result,) = compute(self, traverse=False, **kwargs)
../../../../anaconda/envs/storylines/lib/python3.6/site-packages/dask/base.py:202: in compute
    results = get(dsk, keys, **kwargs)
../../../../anaconda/envs/storylines/lib/python3.6/site-packages/dask/async.py:544: in get_sync
    raise_on_exception=True, **kwargs)
../../../../anaconda/envs/storylines/lib/python3.6/site-packages/dask/async.py:487: in get_async
    fire_task()
../../../../anaconda/envs/storylines/lib/python3.6/site-packages/dask/async.py:483: in fire_task
    callback=queue.put)
../../../../anaconda/envs/storylines/lib/python3.6/site-packages/dask/async.py:532: in apply_sync
    res = func(*args, **kwds)
../../../../anaconda/envs/storylines/lib/python3.6/site-packages/dask/async.py:266: in execute_task
    result = _execute_task(task, data)

-----

arg = (<built-in function getitem>, ('data_to_match_array', 0, 0, 0), (0, slice(None, None, None), slice(None, None, None))), cache = {}, dsk = None

    def _execute_task(arg, cache, dsk=None):
        """ Do the actual work of collecting data and executing a function
    
        Examples
        --------
    
        >>> cache = {'x': 1, 'y': 2}
    
        Compute tasks against a cache
        >>> _execute_task((add, 'x', 1), cache)  # Compute task in naive manner
        2
        >>> _execute_task((add, (inc, 'x'), 1), cache)  # Support nested computation
        3
    
        Also grab data from cache
        >>> _execute_task('x', cache)
        1
    
        Support nested lists
        >>> list(_execute_task(['x', 'y'], cache))
        [1, 2]
    
        >>> list(map(list, _execute_task([['x', 'y'], ['y', 'x']], cache)))
        [[1, 2], [2, 1]]
    
        >>> _execute_task('foo', cache)  # Passes through on non-keys
        'foo'
        """
        if isinstance(arg, list):
            return [_execute_task(a, cache) for a in arg]
        elif istask(arg):
            func, args = arg[0], arg[1:]
            args2 = [_execute_task(a, cache) for a in args]
>           return func(*args2)
E           TypeError: tuple indices must be integers or slices, not tuple

../../../../anaconda/envs/storylines/lib/python3.6/site-packages/dask/async.py:247: TypeError

cc @shoyer

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions