Skip to content

Add support for NumPy 1.20.0#7084

Closed
QuLogic wants to merge 3 commits intodask:masterfrom
QuLogic:np120
Closed

Add support for NumPy 1.20.0#7084
QuLogic wants to merge 3 commits intodask:masterfrom
QuLogic:np120

Conversation

@QuLogic
Copy link
Contributor

@QuLogic QuLogic commented Jan 19, 2021

The NumPy 1.20.0 release candidate is already in Fedora Rawhide for testing, and 34 tests fail. This fixes most of them.

  • Tests added / passed
  • [?] Passes black dask / flake8 dask

@QuLogic
Copy link
Contributor Author

QuLogic commented Jan 19, 2021

There are 6 remaining failures, though they're essentially the same:

_________ test_array_cumreduction_axis[sequential-None-False-cumprod] __________
[gw2] linux -- Python 3.9.1 /usr/bin/python3
func = 'cumprod', use_nan = False, axis = None, method = 'sequential'
    @pytest.mark.parametrize("func", ["cumsum", "cumprod", "nancumsum", "nancumprod"])
    @pytest.mark.parametrize("use_nan", [False, True])
    @pytest.mark.parametrize("axis", [None, 0, 1, -1])
    @pytest.mark.parametrize("method", ["sequential", "blelloch"])
    def test_array_cumreduction_axis(func, use_nan, axis, method):
        np_func = getattr(np, func)
        da_func = getattr(da, func)
    
        s = (10, 11, 12)
        a = np.arange(np.prod(s), dtype=float).reshape(s)
        if use_nan:
            a[1] = np.nan
        d = da.from_array(a, chunks=(4, 5, 6))
    
        a_r = np_func(a, axis=axis)
        d_r = da_func(d, axis=axis, method=method)
    
>       assert_eq(a_r, d_r)
dask/array/tests/test_reductions.py:553: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
dask/array/tests/test_reductions.py:18: in assert_eq
    _assert_eq(a, b, equal_nan=True)
dask/array/utils.py:259: in assert_eq
    b, bdt, b_meta, b_computed = _get_dt_meta_computed(
dask/array/utils.py:234: in _get_dt_meta_computed
    x = x.compute(scheduler="sync")
dask/base.py:279: in compute
    (result,) = compute(self, traverse=False, **kwargs)
dask/base.py:561: in compute
    results = schedule(dsk, keys, **kwargs)
dask/local.py:528: in get_sync
    return get_async(apply_sync, 1, dsk, keys, **kwargs)
dask/local.py:495: in get_async
    fire_task()
dask/local.py:457: in fire_task
    apply_async(
dask/local.py:517: in apply_sync
    res = func(*args, **kwds)
dask/local.py:227: in execute_task
    result = pack_exception(e, dumps)
dask/local.py:222: in execute_task
    result = _execute_task(task, data)
dask/core.py:121: in _execute_task
    return func(*(_execute_task(a, cache) for a in args))
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
a = array([0.])
b = array([2.64000000e+002, 6.99600000e+004, 1.86093600e+007, 4.96869912e+009,
       1.33161136e+012, 3.58203457e+014, 9....     inf,             inf,             inf,
                   inf,             inf,             inf,             inf])
    def _cumprod_merge(a, b):
        if isinstance(a, np.ma.masked_array) or isinstance(b, np.ma.masked_array):
            values = np.ma.getdata(a) * np.ma.getdata(b)
            return np.ma.masked_array(values, mask=np.ma.getmaskarray(b))
>       return a * b
E       RuntimeWarning: invalid value encountered in multiply
dask/array/reductions.py:1398: RuntimeWarning
________ test_array_cumreduction_axis[sequential-None-False-nancumprod] ________
[gw2] linux -- Python 3.9.1 /usr/bin/python3
func = 'nancumprod', use_nan = False, axis = None, method = 'sequential'
    @pytest.mark.parametrize("func", ["cumsum", "cumprod", "nancumsum", "nancumprod"])
    @pytest.mark.parametrize("use_nan", [False, True])
    @pytest.mark.parametrize("axis", [None, 0, 1, -1])
    @pytest.mark.parametrize("method", ["sequential", "blelloch"])
    def test_array_cumreduction_axis(func, use_nan, axis, method):
        np_func = getattr(np, func)
        da_func = getattr(da, func)
    
        s = (10, 11, 12)
        a = np.arange(np.prod(s), dtype=float).reshape(s)
        if use_nan:
            a[1] = np.nan
        d = da.from_array(a, chunks=(4, 5, 6))
    
        a_r = np_func(a, axis=axis)
        d_r = da_func(d, axis=axis, method=method)
    
>       assert_eq(a_r, d_r)
dask/array/tests/test_reductions.py:553: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
dask/array/tests/test_reductions.py:18: in assert_eq
    _assert_eq(a, b, equal_nan=True)
dask/array/utils.py:259: in assert_eq
    b, bdt, b_meta, b_computed = _get_dt_meta_computed(
dask/array/utils.py:234: in _get_dt_meta_computed
    x = x.compute(scheduler="sync")
dask/base.py:279: in compute
    (result,) = compute(self, traverse=False, **kwargs)
dask/base.py:561: in compute
    results = schedule(dsk, keys, **kwargs)
dask/local.py:528: in get_sync
    return get_async(apply_sync, 1, dsk, keys, **kwargs)
dask/local.py:495: in get_async
    fire_task()
dask/local.py:457: in fire_task
    apply_async(
dask/local.py:517: in apply_sync
    res = func(*args, **kwds)
dask/local.py:227: in execute_task
    result = pack_exception(e, dumps)
dask/local.py:222: in execute_task
    result = _execute_task(task, data)
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
arg = (<built-in function mul>, ('nancumprod-f9ff59bf79794516542461a63b8a4646', 'extra', 2), ('nancumprod-6ea9b00ccb738b2c003c7ca94a26a289', 2))
cache = {('nancumprod-6ea9b00ccb738b2c003c7ca94a26a289', 2): array([2.64000000e+002, 6.99600000e+004, 1.86093600e+007, 4.96869...     inf,             inf,             inf]), ('nancumprod-f9ff59bf79794516542461a63b8a4646', 'extra', 2): array([0.])}
dsk = None
    def _execute_task(arg, cache, dsk=None):
        """Do the actual work of collecting data and executing a function
    
        Examples
        --------
    
        >>> cache = {'x': 1, 'y': 2}
    
        Compute tasks against a cache
        >>> _execute_task((add, 'x', 1), cache)  # Compute task in naive manner
        2
        >>> _execute_task((add, (inc, 'x'), 1), cache)  # Support nested computation
        3
    
        Also grab data from cache
        >>> _execute_task('x', cache)
        1
    
        Support nested lists
        >>> list(_execute_task(['x', 'y'], cache))
        [1, 2]
    
        >>> list(map(list, _execute_task([['x', 'y'], ['y', 'x']], cache)))
        [[1, 2], [2, 1]]
    
        >>> _execute_task('foo', cache)  # Passes through on non-keys
        'foo'
        """
        if isinstance(arg, list):
            return [_execute_task(a, cache) for a in arg]
        elif istask(arg):
            func, args = arg[0], arg[1:]
            # Note: Don't assign the subtask results to a variable. numpy detects
            # temporaries by their reference count and can execute certain
            # operations in-place.
>           return func(*(_execute_task(a, cache) for a in args))
E           RuntimeWarning: invalid value encountered in multiply
dask/core.py:121: RuntimeWarning
________ test_array_cumreduction_axis[sequential-None-True-nancumprod] _________
[gw2] linux -- Python 3.9.1 /usr/bin/python3
func = 'nancumprod', use_nan = True, axis = None, method = 'sequential'
    @pytest.mark.parametrize("func", ["cumsum", "cumprod", "nancumsum", "nancumprod"])
    @pytest.mark.parametrize("use_nan", [False, True])
    @pytest.mark.parametrize("axis", [None, 0, 1, -1])
    @pytest.mark.parametrize("method", ["sequential", "blelloch"])
    def test_array_cumreduction_axis(func, use_nan, axis, method):
        np_func = getattr(np, func)
        da_func = getattr(da, func)
    
        s = (10, 11, 12)
        a = np.arange(np.prod(s), dtype=float).reshape(s)
        if use_nan:
            a[1] = np.nan
        d = da.from_array(a, chunks=(4, 5, 6))
    
        a_r = np_func(a, axis=axis)
        d_r = da_func(d, axis=axis, method=method)
    
>       assert_eq(a_r, d_r)
dask/array/tests/test_reductions.py:553: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
dask/array/tests/test_reductions.py:18: in assert_eq
    _assert_eq(a, b, equal_nan=True)
dask/array/utils.py:259: in assert_eq
    b, bdt, b_meta, b_computed = _get_dt_meta_computed(
dask/array/utils.py:234: in _get_dt_meta_computed
    x = x.compute(scheduler="sync")
dask/base.py:279: in compute
    (result,) = compute(self, traverse=False, **kwargs)
dask/base.py:561: in compute
    results = schedule(dsk, keys, **kwargs)
dask/local.py:528: in get_sync
    return get_async(apply_sync, 1, dsk, keys, **kwargs)
dask/local.py:495: in get_async
    fire_task()
dask/local.py:457: in fire_task
    apply_async(
dask/local.py:517: in apply_sync
    res = func(*args, **kwds)
dask/local.py:227: in execute_task
    result = pack_exception(e, dumps)
dask/local.py:222: in execute_task
    result = _execute_task(task, data)
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
arg = (<built-in function mul>, ('nancumprod-d63b9da7ed3d89f94324b8fe7b8ee65a', 'extra', 2), ('nancumprod-041d650acfcfb8560bb397de6fac230f', 2))
cache = {('nancumprod-041d650acfcfb8560bb397de6fac230f', 2): array([2.64000000e+002, 6.99600000e+004, 1.86093600e+007, 4.96869...     inf,             inf,             inf]), ('nancumprod-d63b9da7ed3d89f94324b8fe7b8ee65a', 'extra', 2): array([0.])}
dsk = None
    def _execute_task(arg, cache, dsk=None):
        """Do the actual work of collecting data and executing a function
    
        Examples
        --------
    
        >>> cache = {'x': 1, 'y': 2}
    
        Compute tasks against a cache
        >>> _execute_task((add, 'x', 1), cache)  # Compute task in naive manner
        2
        >>> _execute_task((add, (inc, 'x'), 1), cache)  # Support nested computation
        3
    
        Also grab data from cache
        >>> _execute_task('x', cache)
        1
    
        Support nested lists
        >>> list(_execute_task(['x', 'y'], cache))
        [1, 2]
    
        >>> list(map(list, _execute_task([['x', 'y'], ['y', 'x']], cache)))
        [[1, 2], [2, 1]]
    
        >>> _execute_task('foo', cache)  # Passes through on non-keys
        'foo'
        """
        if isinstance(arg, list):
            return [_execute_task(a, cache) for a in arg]
        elif istask(arg):
            func, args = arg[0], arg[1:]
            # Note: Don't assign the subtask results to a variable. numpy detects
            # temporaries by their reference count and can execute certain
            # operations in-place.
>           return func(*(_execute_task(a, cache) for a in args))
E           RuntimeWarning: invalid value encountered in multiply
dask/core.py:121: RuntimeWarning
__________ test_array_cumreduction_axis[blelloch-None-False-cumprod] ___________
[gw2] linux -- Python 3.9.1 /usr/bin/python3
func = 'cumprod', use_nan = False, axis = None, method = 'blelloch'
    @pytest.mark.parametrize("func", ["cumsum", "cumprod", "nancumsum", "nancumprod"])
    @pytest.mark.parametrize("use_nan", [False, True])
    @pytest.mark.parametrize("axis", [None, 0, 1, -1])
    @pytest.mark.parametrize("method", ["sequential", "blelloch"])
    def test_array_cumreduction_axis(func, use_nan, axis, method):
        np_func = getattr(np, func)
        da_func = getattr(da, func)
    
        s = (10, 11, 12)
        a = np.arange(np.prod(s), dtype=float).reshape(s)
        if use_nan:
            a[1] = np.nan
        d = da.from_array(a, chunks=(4, 5, 6))
    
        a_r = np_func(a, axis=axis)
        d_r = da_func(d, axis=axis, method=method)
    
>       assert_eq(a_r, d_r)
dask/array/tests/test_reductions.py:553: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
dask/array/tests/test_reductions.py:18: in assert_eq
    _assert_eq(a, b, equal_nan=True)
dask/array/utils.py:259: in assert_eq
    b, bdt, b_meta, b_computed = _get_dt_meta_computed(
dask/array/utils.py:234: in _get_dt_meta_computed
    x = x.compute(scheduler="sync")
dask/base.py:279: in compute
    (result,) = compute(self, traverse=False, **kwargs)
dask/base.py:561: in compute
    results = schedule(dsk, keys, **kwargs)
dask/local.py:528: in get_sync
    return get_async(apply_sync, 1, dsk, keys, **kwargs)
dask/local.py:495: in get_async
    fire_task()
dask/local.py:457: in fire_task
    apply_async(
dask/local.py:517: in apply_sync
    res = func(*args, **kwds)
dask/local.py:227: in execute_task
    result = pack_exception(e, dumps)
dask/local.py:222: in execute_task
    result = _execute_task(task, data)
dask/core.py:121: in _execute_task
    return func(*(_execute_task(a, cache) for a in args))
dask/array/reductions.py:1145: in _prefixscan_combine
    return binop(pre, func(x, axis=axis, dtype=dtype))
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
a = array([0.])
b = array([2.64000000e+002, 6.99600000e+004, 1.86093600e+007, 4.96869912e+009,
       1.33161136e+012, 3.58203457e+014, 9....     inf,             inf,             inf,
                   inf,             inf,             inf,             inf])
    def _cumprod_merge(a, b):
        if isinstance(a, np.ma.masked_array) or isinstance(b, np.ma.masked_array):
            values = np.ma.getdata(a) * np.ma.getdata(b)
            return np.ma.masked_array(values, mask=np.ma.getmaskarray(b))
>       return a * b
E       RuntimeWarning: invalid value encountered in multiply
dask/array/reductions.py:1398: RuntimeWarning
_________ test_array_cumreduction_axis[blelloch-None-False-nancumprod] _________
[gw2] linux -- Python 3.9.1 /usr/bin/python3
func = 'nancumprod', use_nan = False, axis = None, method = 'blelloch'
    @pytest.mark.parametrize("func", ["cumsum", "cumprod", "nancumsum", "nancumprod"])
    @pytest.mark.parametrize("use_nan", [False, True])
    @pytest.mark.parametrize("axis", [None, 0, 1, -1])
    @pytest.mark.parametrize("method", ["sequential", "blelloch"])
    def test_array_cumreduction_axis(func, use_nan, axis, method):
        np_func = getattr(np, func)
        da_func = getattr(da, func)
    
        s = (10, 11, 12)
        a = np.arange(np.prod(s), dtype=float).reshape(s)
        if use_nan:
            a[1] = np.nan
        d = da.from_array(a, chunks=(4, 5, 6))
    
        a_r = np_func(a, axis=axis)
        d_r = da_func(d, axis=axis, method=method)
    
>       assert_eq(a_r, d_r)
dask/array/tests/test_reductions.py:553: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
dask/array/tests/test_reductions.py:18: in assert_eq
    _assert_eq(a, b, equal_nan=True)
dask/array/utils.py:259: in assert_eq
    b, bdt, b_meta, b_computed = _get_dt_meta_computed(
dask/array/utils.py:234: in _get_dt_meta_computed
    x = x.compute(scheduler="sync")
dask/base.py:279: in compute
    (result,) = compute(self, traverse=False, **kwargs)
dask/base.py:561: in compute
    results = schedule(dsk, keys, **kwargs)
dask/local.py:528: in get_sync
    return get_async(apply_sync, 1, dsk, keys, **kwargs)
dask/local.py:495: in get_async
    fire_task()
dask/local.py:457: in fire_task
    apply_async(
dask/local.py:517: in apply_sync
    res = func(*args, **kwds)
dask/local.py:227: in execute_task
    result = pack_exception(e, dumps)
dask/local.py:222: in execute_task
    result = _execute_task(task, data)
dask/core.py:121: in _execute_task
    return func(*(_execute_task(a, cache) for a in args))
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
func = <function nancumprod at 0x7f22206bb040>, binop = <built-in function mul>
pre = array([0.])
x = array([264., 265., 266., 267., 268., 269., 270., 271., 272., 273., 274.,
       275., 276., 277., 278., 279., 280., 28...7., 378., 379., 380., 381., 382., 383., 384.,
       385., 386., 387., 388., 389., 390., 391., 392., 393., 394., 395.])
axis = 0, dtype = dtype('float64')
    def _prefixscan_combine(func, binop, pre, x, axis, dtype):
        """Combine results of a parallel prefix scan such as cumsum
    
        Parameters
        ----------
        func : callable
            Cumulative function (e.g. ``np.cumsum``)
        binop : callable
            Associative function (e.g. ``add``)
        pre : np.array
            The value calculated in parallel from ``preop``.
            For example, the sum of all the previous blocks.
        x : np.array
            Current block
        axis : int
        dtype : dtype
    
        Returns
        -------
        np.array
        """
        # We could compute this in two tasks.
        # This would allow us to do useful work (i.e., func), while waiting on `pre`.
        # Using one task may guide the scheduler to do better and reduce scheduling overhead.
>       return binop(pre, func(x, axis=axis, dtype=dtype))
E       RuntimeWarning: invalid value encountered in multiply
dask/array/reductions.py:1145: RuntimeWarning
___________ test_array_cumreduction_axis[blelloch-None-True-cumprod] ___________
[gw2] linux -- Python 3.9.1 /usr/bin/python3
func = 'cumprod', use_nan = True, axis = None, method = 'blelloch'
    @pytest.mark.parametrize("func", ["cumsum", "cumprod", "nancumsum", "nancumprod"])
    @pytest.mark.parametrize("use_nan", [False, True])
    @pytest.mark.parametrize("axis", [None, 0, 1, -1])
    @pytest.mark.parametrize("method", ["sequential", "blelloch"])
    def test_array_cumreduction_axis(func, use_nan, axis, method):
        np_func = getattr(np, func)
        da_func = getattr(da, func)
    
        s = (10, 11, 12)
        a = np.arange(np.prod(s), dtype=float).reshape(s)
        if use_nan:
            a[1] = np.nan
        d = da.from_array(a, chunks=(4, 5, 6))
    
        a_r = np_func(a, axis=axis)
        d_r = da_func(d, axis=axis, method=method)
    
>       assert_eq(a_r, d_r)
dask/array/tests/test_reductions.py:553: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
dask/array/tests/test_reductions.py:18: in assert_eq
    _assert_eq(a, b, equal_nan=True)
dask/array/utils.py:259: in assert_eq
    b, bdt, b_meta, b_computed = _get_dt_meta_computed(
dask/array/utils.py:234: in _get_dt_meta_computed
    x = x.compute(scheduler="sync")
dask/base.py:279: in compute
    (result,) = compute(self, traverse=False, **kwargs)
dask/base.py:561: in compute
    results = schedule(dsk, keys, **kwargs)
dask/local.py:528: in get_sync
    return get_async(apply_sync, 1, dsk, keys, **kwargs)
dask/local.py:495: in get_async
    fire_task()
dask/local.py:457: in fire_task
    apply_async(
dask/local.py:517: in apply_sync
    res = func(*args, **kwds)
dask/local.py:227: in execute_task
    result = pack_exception(e, dumps)
dask/local.py:222: in execute_task
    result = _execute_task(task, data)
dask/core.py:121: in _execute_task
    return func(*(_execute_task(a, cache) for a in args))
dask/optimization.py:963: in __call__
    return core.get(self.dsk, self.outkey, dict(zip(self.inkeys, args)))
dask/core.py:151: in get
    result = _execute_task(task, cache)
dask/core.py:121: in _execute_task
    return func(*(_execute_task(a, cache) for a in args))
dask/utils.py:31: in apply
    return func(*args, **kwargs)
<__array_function__ internals>:5: in prod
    ???
/usr/lib64/python3.9/site-packages/numpy/core/fromnumeric.py:3030: in prod
    return _wrapreduction(a, np.multiply, 'prod', axis, dtype, out,
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
obj = array([264., 265., 266., 267., 268., 269., 270., 271., 272., 273., 274.,
       275., 276., 277., 278., 279., 280., 28...7., 378., 379., 380., 381., 382., 383., 384.,
       385., 386., 387., 388., 389., 390., 391., 392., 393., 394., 395.])
ufunc = <ufunc 'multiply'>, method = 'prod', axis = 0, dtype = None, out = None
kwargs = {'initial': <no value>, 'keepdims': True, 'where': <no value>}
passkwargs = {'keepdims': True}
    def _wrapreduction(obj, ufunc, method, axis, dtype, out, **kwargs):
        passkwargs = {k: v for k, v in kwargs.items()
                      if v is not np._NoValue}
    
        if type(obj) is not mu.ndarray:
            try:
                reduction = getattr(obj, method)
            except AttributeError:
                pass
            else:
                # This branch is needed for reductions like any which don't
                # support a dtype.
                if dtype is not None:
                    return reduction(axis=axis, dtype=dtype, out=out, **passkwargs)
                else:
                    return reduction(axis=axis, out=out, **passkwargs)
    
>       return ufunc.reduce(obj, axis, dtype, out, **passkwargs)
E       RuntimeWarning: overflow encountered in reduce
/usr/lib64/python3.9/site-packages/numpy/core/fromnumeric.py:87: RuntimeWarning
_________ test_array_cumreduction_axis[blelloch-None-True-nancumprod] __________
[gw2] linux -- Python 3.9.1 /usr/bin/python3
func = 'nancumprod', use_nan = True, axis = None, method = 'blelloch'
    @pytest.mark.parametrize("func", ["cumsum", "cumprod", "nancumsum", "nancumprod"])
    @pytest.mark.parametrize("use_nan", [False, True])
    @pytest.mark.parametrize("axis", [None, 0, 1, -1])
    @pytest.mark.parametrize("method", ["sequential", "blelloch"])
    def test_array_cumreduction_axis(func, use_nan, axis, method):
        np_func = getattr(np, func)
        da_func = getattr(da, func)
    
        s = (10, 11, 12)
        a = np.arange(np.prod(s), dtype=float).reshape(s)
        if use_nan:
            a[1] = np.nan
        d = da.from_array(a, chunks=(4, 5, 6))
    
        a_r = np_func(a, axis=axis)
        d_r = da_func(d, axis=axis, method=method)
    
>       assert_eq(a_r, d_r)
dask/array/tests/test_reductions.py:553: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
dask/array/tests/test_reductions.py:18: in assert_eq
    _assert_eq(a, b, equal_nan=True)
dask/array/utils.py:259: in assert_eq
    b, bdt, b_meta, b_computed = _get_dt_meta_computed(
dask/array/utils.py:234: in _get_dt_meta_computed
    x = x.compute(scheduler="sync")
dask/base.py:279: in compute
    (result,) = compute(self, traverse=False, **kwargs)
dask/base.py:561: in compute
    results = schedule(dsk, keys, **kwargs)
dask/local.py:528: in get_sync
    return get_async(apply_sync, 1, dsk, keys, **kwargs)
dask/local.py:495: in get_async
    fire_task()
dask/local.py:457: in fire_task
    apply_async(
dask/local.py:517: in apply_sync
    res = func(*args, **kwds)
dask/local.py:227: in execute_task
    result = pack_exception(e, dumps)
dask/local.py:222: in execute_task
    result = _execute_task(task, data)
dask/core.py:121: in _execute_task
    return func(*(_execute_task(a, cache) for a in args))
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
func = <function nancumprod at 0x7f22206bb040>, binop = <built-in function mul>
pre = array([0.])
x = array([264., 265., 266., 267., 268., 269., 270., 271., 272., 273., 274.,
       275., 276., 277., 278., 279., 280., 28...7., 378., 379., 380., 381., 382., 383., 384.,
       385., 386., 387., 388., 389., 390., 391., 392., 393., 394., 395.])
axis = 0, dtype = dtype('float64')
    def _prefixscan_combine(func, binop, pre, x, axis, dtype):
        """Combine results of a parallel prefix scan such as cumsum
    
        Parameters
        ----------
        func : callable
            Cumulative function (e.g. ``np.cumsum``)
        binop : callable
            Associative function (e.g. ``add``)
        pre : np.array
            The value calculated in parallel from ``preop``.
            For example, the sum of all the previous blocks.
        x : np.array
            Current block
        axis : int
        dtype : dtype
    
        Returns
        -------
        np.array
        """
        # We could compute this in two tasks.
        # This would allow us to do useful work (i.e., func), while waiting on `pre`.
        # Using one task may guide the scheduler to do better and reduce scheduling overhead.
>       return binop(pre, func(x, axis=axis, dtype=dtype))
E       RuntimeWarning: invalid value encountered in multiply
dask/array/reductions.py:1145: RuntimeWarning

Should this test simply set NumPy to ignore invalid values?

@QuLogic
Copy link
Contributor Author

QuLogic commented Jan 19, 2021

Ignoring the invalid/overflow warnings, I see that Dask is very wrong somehow. For use_nan=False, NumPy returns all 0s, but Dask returns some NaNs. For use_nan=Trueand nancumprod, NumPy returns all 0s, but Dask returns some NaNs.

@jsignell
Copy link
Member

I think the issue is that there are inf values legitimately being generated in the partitions and since this is now a float. And then when we go to combine the partitions the inf has issues. It seems like this behavior is different for ints.

Consider:

In [1]: s = (10, 11, 12)
   ...: a = np.arange(1, np.prod(s)+1, dtype=float).reshape(s)

In [2]: np.cumprod(a)
Out[2]: array([ 1.,  2.,  6., ..., inf, inf, inf])

In [3]: s = (10, 11, 12)
   ...: a = np.arange(1, np.prod(s)+1).reshape(s)

In [4]: np.cumprod(a)
Out[4]: array([1, 2, 6, ..., 0, 0, 0])

In the case of this test I think it would probably be resolved by having smaller partitions or by having a smaller array.

@jsignell
Copy link
Member

Oh and then this is exacerbated because we try to multiply the result of the previous partitions (np.array([0.]) by the result of the other partition and this raises when it gets to inf * 0

In [5]: np.cumprod(a) * np.array([0.])
<ipython-input-32-aa8604efc829>:1: RuntimeWarning: invalid value encountered in multiply
  np.cumprod(a) * np.array([0.])
Out[5]: array([ 0.,  0.,  0., ..., nan, nan, nan])

@jsignell
Copy link
Member

In dask we are taking the cumprod of each partition and then multiplying the last value by the output of the previous partition. A better way would be to multiply the first value by the ouput of the previous partition before computing the cumprod on the next.

@jsignell
Copy link
Member

But then I guess you are committed to computing linearly...

@jsignell jsignell mentioned this pull request Jan 20, 2021
2 tasks
@jsignell
Copy link
Member

Ok I put up a fix for the tests in #7089. I didn't want to just push it to your branch but here's the diff in case that's easier:

diff --git a/dask/array/reductions.py b/dask/array/reductions.py
index dd2b9f04..908e6873 100644
--- a/dask/array/reductions.py
+++ b/dask/array/reductions.py
@@ -1194,7 +1194,7 @@ def prefixscan_blelloch(func, preop, binop, x, axis=None, dtype=None, out=None):
     dask array
     """
     if axis is None:
-        x = x.flatten()
+        x = x.flatten().rechunk(chunks=x.npartitions)
         axis = 0
     if dtype is None:
         dtype = getattr(func(np.empty((0,), dtype=x.dtype)), "dtype", object)
@@ -1340,7 +1340,7 @@ def cumreduction(
         )
 
     if axis is None:
-        x = x.flatten()
+        x = x.flatten().rechunk(chunks=x.npartitions)
         axis = 0
     if dtype is None:
         dtype = getattr(func(np.empty((0,), dtype=x.dtype)), "dtype", object)
diff --git a/dask/array/tests/test_reductions.py b/dask/array/tests/test_reductions.py
index a773a203..c166b863 100644
--- a/dask/array/tests/test_reductions.py
+++ b/dask/array/tests/test_reductions.py
@@ -546,6 +546,10 @@ def test_array_cumreduction_axis(func, use_nan, axis, method):
     if use_nan:
         a[1] = np.nan
     d = da.from_array(a, chunks=(4, 5, 6))
+    if func in ["cumprod", "nancumprod"] and method == "blelloch" and axis is None:
+        with pytest.warns(RuntimeWarning):
+            da_func(d, axis=axis, method=method).compute()
+            return
 
     a_r = np_func(a, axis=axis)
     d_r = da_func(d, axis=axis, method=method)

@QuLogic
Copy link
Contributor Author

QuLogic commented Jan 21, 2021

OK, so there are two bugs here; that test was relying on np.nan getting converted to a large negative integer, and *cumprod doesn't work correctly with floats. Maybe I should use a large negative integer here, and then you can work on floats separately in #7089?

@QuLogic
Copy link
Contributor Author

QuLogic commented Jan 21, 2021

that test was relying on np.nan getting converted to a large negative integer,

Or on the other hand, maybe that means the test was broken and not really checking what it intended to.

@jsignell
Copy link
Member

Or on the other hand, maybe that means the test was broken and not really checking what it intended to.

I think that's the correct interpretation.

My preference is to merge this PR with the partial fix from #7089 and write up a separate issue to fix cumprod.

@jsignell
Copy link
Member

superseded by #7089

@jsignell jsignell closed this Jan 22, 2021
@QuLogic QuLogic deleted the np120 branch January 26, 2021 08:04
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants