Conversation
|
There are 6 remaining failures, though they're essentially the same: _________ test_array_cumreduction_axis[sequential-None-False-cumprod] __________
[gw2] linux -- Python 3.9.1 /usr/bin/python3
func = 'cumprod', use_nan = False, axis = None, method = 'sequential'
@pytest.mark.parametrize("func", ["cumsum", "cumprod", "nancumsum", "nancumprod"])
@pytest.mark.parametrize("use_nan", [False, True])
@pytest.mark.parametrize("axis", [None, 0, 1, -1])
@pytest.mark.parametrize("method", ["sequential", "blelloch"])
def test_array_cumreduction_axis(func, use_nan, axis, method):
np_func = getattr(np, func)
da_func = getattr(da, func)
s = (10, 11, 12)
a = np.arange(np.prod(s), dtype=float).reshape(s)
if use_nan:
a[1] = np.nan
d = da.from_array(a, chunks=(4, 5, 6))
a_r = np_func(a, axis=axis)
d_r = da_func(d, axis=axis, method=method)
> assert_eq(a_r, d_r)
dask/array/tests/test_reductions.py:553:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
dask/array/tests/test_reductions.py:18: in assert_eq
_assert_eq(a, b, equal_nan=True)
dask/array/utils.py:259: in assert_eq
b, bdt, b_meta, b_computed = _get_dt_meta_computed(
dask/array/utils.py:234: in _get_dt_meta_computed
x = x.compute(scheduler="sync")
dask/base.py:279: in compute
(result,) = compute(self, traverse=False, **kwargs)
dask/base.py:561: in compute
results = schedule(dsk, keys, **kwargs)
dask/local.py:528: in get_sync
return get_async(apply_sync, 1, dsk, keys, **kwargs)
dask/local.py:495: in get_async
fire_task()
dask/local.py:457: in fire_task
apply_async(
dask/local.py:517: in apply_sync
res = func(*args, **kwds)
dask/local.py:227: in execute_task
result = pack_exception(e, dumps)
dask/local.py:222: in execute_task
result = _execute_task(task, data)
dask/core.py:121: in _execute_task
return func(*(_execute_task(a, cache) for a in args))
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
a = array([0.])
b = array([2.64000000e+002, 6.99600000e+004, 1.86093600e+007, 4.96869912e+009,
1.33161136e+012, 3.58203457e+014, 9.... inf, inf, inf,
inf, inf, inf, inf])
def _cumprod_merge(a, b):
if isinstance(a, np.ma.masked_array) or isinstance(b, np.ma.masked_array):
values = np.ma.getdata(a) * np.ma.getdata(b)
return np.ma.masked_array(values, mask=np.ma.getmaskarray(b))
> return a * b
E RuntimeWarning: invalid value encountered in multiply
dask/array/reductions.py:1398: RuntimeWarning
________ test_array_cumreduction_axis[sequential-None-False-nancumprod] ________
[gw2] linux -- Python 3.9.1 /usr/bin/python3
func = 'nancumprod', use_nan = False, axis = None, method = 'sequential'
@pytest.mark.parametrize("func", ["cumsum", "cumprod", "nancumsum", "nancumprod"])
@pytest.mark.parametrize("use_nan", [False, True])
@pytest.mark.parametrize("axis", [None, 0, 1, -1])
@pytest.mark.parametrize("method", ["sequential", "blelloch"])
def test_array_cumreduction_axis(func, use_nan, axis, method):
np_func = getattr(np, func)
da_func = getattr(da, func)
s = (10, 11, 12)
a = np.arange(np.prod(s), dtype=float).reshape(s)
if use_nan:
a[1] = np.nan
d = da.from_array(a, chunks=(4, 5, 6))
a_r = np_func(a, axis=axis)
d_r = da_func(d, axis=axis, method=method)
> assert_eq(a_r, d_r)
dask/array/tests/test_reductions.py:553:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
dask/array/tests/test_reductions.py:18: in assert_eq
_assert_eq(a, b, equal_nan=True)
dask/array/utils.py:259: in assert_eq
b, bdt, b_meta, b_computed = _get_dt_meta_computed(
dask/array/utils.py:234: in _get_dt_meta_computed
x = x.compute(scheduler="sync")
dask/base.py:279: in compute
(result,) = compute(self, traverse=False, **kwargs)
dask/base.py:561: in compute
results = schedule(dsk, keys, **kwargs)
dask/local.py:528: in get_sync
return get_async(apply_sync, 1, dsk, keys, **kwargs)
dask/local.py:495: in get_async
fire_task()
dask/local.py:457: in fire_task
apply_async(
dask/local.py:517: in apply_sync
res = func(*args, **kwds)
dask/local.py:227: in execute_task
result = pack_exception(e, dumps)
dask/local.py:222: in execute_task
result = _execute_task(task, data)
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
arg = (<built-in function mul>, ('nancumprod-f9ff59bf79794516542461a63b8a4646', 'extra', 2), ('nancumprod-6ea9b00ccb738b2c003c7ca94a26a289', 2))
cache = {('nancumprod-6ea9b00ccb738b2c003c7ca94a26a289', 2): array([2.64000000e+002, 6.99600000e+004, 1.86093600e+007, 4.96869... inf, inf, inf]), ('nancumprod-f9ff59bf79794516542461a63b8a4646', 'extra', 2): array([0.])}
dsk = None
def _execute_task(arg, cache, dsk=None):
"""Do the actual work of collecting data and executing a function
Examples
--------
>>> cache = {'x': 1, 'y': 2}
Compute tasks against a cache
>>> _execute_task((add, 'x', 1), cache) # Compute task in naive manner
2
>>> _execute_task((add, (inc, 'x'), 1), cache) # Support nested computation
3
Also grab data from cache
>>> _execute_task('x', cache)
1
Support nested lists
>>> list(_execute_task(['x', 'y'], cache))
[1, 2]
>>> list(map(list, _execute_task([['x', 'y'], ['y', 'x']], cache)))
[[1, 2], [2, 1]]
>>> _execute_task('foo', cache) # Passes through on non-keys
'foo'
"""
if isinstance(arg, list):
return [_execute_task(a, cache) for a in arg]
elif istask(arg):
func, args = arg[0], arg[1:]
# Note: Don't assign the subtask results to a variable. numpy detects
# temporaries by their reference count and can execute certain
# operations in-place.
> return func(*(_execute_task(a, cache) for a in args))
E RuntimeWarning: invalid value encountered in multiply
dask/core.py:121: RuntimeWarning
________ test_array_cumreduction_axis[sequential-None-True-nancumprod] _________
[gw2] linux -- Python 3.9.1 /usr/bin/python3
func = 'nancumprod', use_nan = True, axis = None, method = 'sequential'
@pytest.mark.parametrize("func", ["cumsum", "cumprod", "nancumsum", "nancumprod"])
@pytest.mark.parametrize("use_nan", [False, True])
@pytest.mark.parametrize("axis", [None, 0, 1, -1])
@pytest.mark.parametrize("method", ["sequential", "blelloch"])
def test_array_cumreduction_axis(func, use_nan, axis, method):
np_func = getattr(np, func)
da_func = getattr(da, func)
s = (10, 11, 12)
a = np.arange(np.prod(s), dtype=float).reshape(s)
if use_nan:
a[1] = np.nan
d = da.from_array(a, chunks=(4, 5, 6))
a_r = np_func(a, axis=axis)
d_r = da_func(d, axis=axis, method=method)
> assert_eq(a_r, d_r)
dask/array/tests/test_reductions.py:553:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
dask/array/tests/test_reductions.py:18: in assert_eq
_assert_eq(a, b, equal_nan=True)
dask/array/utils.py:259: in assert_eq
b, bdt, b_meta, b_computed = _get_dt_meta_computed(
dask/array/utils.py:234: in _get_dt_meta_computed
x = x.compute(scheduler="sync")
dask/base.py:279: in compute
(result,) = compute(self, traverse=False, **kwargs)
dask/base.py:561: in compute
results = schedule(dsk, keys, **kwargs)
dask/local.py:528: in get_sync
return get_async(apply_sync, 1, dsk, keys, **kwargs)
dask/local.py:495: in get_async
fire_task()
dask/local.py:457: in fire_task
apply_async(
dask/local.py:517: in apply_sync
res = func(*args, **kwds)
dask/local.py:227: in execute_task
result = pack_exception(e, dumps)
dask/local.py:222: in execute_task
result = _execute_task(task, data)
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
arg = (<built-in function mul>, ('nancumprod-d63b9da7ed3d89f94324b8fe7b8ee65a', 'extra', 2), ('nancumprod-041d650acfcfb8560bb397de6fac230f', 2))
cache = {('nancumprod-041d650acfcfb8560bb397de6fac230f', 2): array([2.64000000e+002, 6.99600000e+004, 1.86093600e+007, 4.96869... inf, inf, inf]), ('nancumprod-d63b9da7ed3d89f94324b8fe7b8ee65a', 'extra', 2): array([0.])}
dsk = None
def _execute_task(arg, cache, dsk=None):
"""Do the actual work of collecting data and executing a function
Examples
--------
>>> cache = {'x': 1, 'y': 2}
Compute tasks against a cache
>>> _execute_task((add, 'x', 1), cache) # Compute task in naive manner
2
>>> _execute_task((add, (inc, 'x'), 1), cache) # Support nested computation
3
Also grab data from cache
>>> _execute_task('x', cache)
1
Support nested lists
>>> list(_execute_task(['x', 'y'], cache))
[1, 2]
>>> list(map(list, _execute_task([['x', 'y'], ['y', 'x']], cache)))
[[1, 2], [2, 1]]
>>> _execute_task('foo', cache) # Passes through on non-keys
'foo'
"""
if isinstance(arg, list):
return [_execute_task(a, cache) for a in arg]
elif istask(arg):
func, args = arg[0], arg[1:]
# Note: Don't assign the subtask results to a variable. numpy detects
# temporaries by their reference count and can execute certain
# operations in-place.
> return func(*(_execute_task(a, cache) for a in args))
E RuntimeWarning: invalid value encountered in multiply
dask/core.py:121: RuntimeWarning
__________ test_array_cumreduction_axis[blelloch-None-False-cumprod] ___________
[gw2] linux -- Python 3.9.1 /usr/bin/python3
func = 'cumprod', use_nan = False, axis = None, method = 'blelloch'
@pytest.mark.parametrize("func", ["cumsum", "cumprod", "nancumsum", "nancumprod"])
@pytest.mark.parametrize("use_nan", [False, True])
@pytest.mark.parametrize("axis", [None, 0, 1, -1])
@pytest.mark.parametrize("method", ["sequential", "blelloch"])
def test_array_cumreduction_axis(func, use_nan, axis, method):
np_func = getattr(np, func)
da_func = getattr(da, func)
s = (10, 11, 12)
a = np.arange(np.prod(s), dtype=float).reshape(s)
if use_nan:
a[1] = np.nan
d = da.from_array(a, chunks=(4, 5, 6))
a_r = np_func(a, axis=axis)
d_r = da_func(d, axis=axis, method=method)
> assert_eq(a_r, d_r)
dask/array/tests/test_reductions.py:553:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
dask/array/tests/test_reductions.py:18: in assert_eq
_assert_eq(a, b, equal_nan=True)
dask/array/utils.py:259: in assert_eq
b, bdt, b_meta, b_computed = _get_dt_meta_computed(
dask/array/utils.py:234: in _get_dt_meta_computed
x = x.compute(scheduler="sync")
dask/base.py:279: in compute
(result,) = compute(self, traverse=False, **kwargs)
dask/base.py:561: in compute
results = schedule(dsk, keys, **kwargs)
dask/local.py:528: in get_sync
return get_async(apply_sync, 1, dsk, keys, **kwargs)
dask/local.py:495: in get_async
fire_task()
dask/local.py:457: in fire_task
apply_async(
dask/local.py:517: in apply_sync
res = func(*args, **kwds)
dask/local.py:227: in execute_task
result = pack_exception(e, dumps)
dask/local.py:222: in execute_task
result = _execute_task(task, data)
dask/core.py:121: in _execute_task
return func(*(_execute_task(a, cache) for a in args))
dask/array/reductions.py:1145: in _prefixscan_combine
return binop(pre, func(x, axis=axis, dtype=dtype))
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
a = array([0.])
b = array([2.64000000e+002, 6.99600000e+004, 1.86093600e+007, 4.96869912e+009,
1.33161136e+012, 3.58203457e+014, 9.... inf, inf, inf,
inf, inf, inf, inf])
def _cumprod_merge(a, b):
if isinstance(a, np.ma.masked_array) or isinstance(b, np.ma.masked_array):
values = np.ma.getdata(a) * np.ma.getdata(b)
return np.ma.masked_array(values, mask=np.ma.getmaskarray(b))
> return a * b
E RuntimeWarning: invalid value encountered in multiply
dask/array/reductions.py:1398: RuntimeWarning
_________ test_array_cumreduction_axis[blelloch-None-False-nancumprod] _________
[gw2] linux -- Python 3.9.1 /usr/bin/python3
func = 'nancumprod', use_nan = False, axis = None, method = 'blelloch'
@pytest.mark.parametrize("func", ["cumsum", "cumprod", "nancumsum", "nancumprod"])
@pytest.mark.parametrize("use_nan", [False, True])
@pytest.mark.parametrize("axis", [None, 0, 1, -1])
@pytest.mark.parametrize("method", ["sequential", "blelloch"])
def test_array_cumreduction_axis(func, use_nan, axis, method):
np_func = getattr(np, func)
da_func = getattr(da, func)
s = (10, 11, 12)
a = np.arange(np.prod(s), dtype=float).reshape(s)
if use_nan:
a[1] = np.nan
d = da.from_array(a, chunks=(4, 5, 6))
a_r = np_func(a, axis=axis)
d_r = da_func(d, axis=axis, method=method)
> assert_eq(a_r, d_r)
dask/array/tests/test_reductions.py:553:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
dask/array/tests/test_reductions.py:18: in assert_eq
_assert_eq(a, b, equal_nan=True)
dask/array/utils.py:259: in assert_eq
b, bdt, b_meta, b_computed = _get_dt_meta_computed(
dask/array/utils.py:234: in _get_dt_meta_computed
x = x.compute(scheduler="sync")
dask/base.py:279: in compute
(result,) = compute(self, traverse=False, **kwargs)
dask/base.py:561: in compute
results = schedule(dsk, keys, **kwargs)
dask/local.py:528: in get_sync
return get_async(apply_sync, 1, dsk, keys, **kwargs)
dask/local.py:495: in get_async
fire_task()
dask/local.py:457: in fire_task
apply_async(
dask/local.py:517: in apply_sync
res = func(*args, **kwds)
dask/local.py:227: in execute_task
result = pack_exception(e, dumps)
dask/local.py:222: in execute_task
result = _execute_task(task, data)
dask/core.py:121: in _execute_task
return func(*(_execute_task(a, cache) for a in args))
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
func = <function nancumprod at 0x7f22206bb040>, binop = <built-in function mul>
pre = array([0.])
x = array([264., 265., 266., 267., 268., 269., 270., 271., 272., 273., 274.,
275., 276., 277., 278., 279., 280., 28...7., 378., 379., 380., 381., 382., 383., 384.,
385., 386., 387., 388., 389., 390., 391., 392., 393., 394., 395.])
axis = 0, dtype = dtype('float64')
def _prefixscan_combine(func, binop, pre, x, axis, dtype):
"""Combine results of a parallel prefix scan such as cumsum
Parameters
----------
func : callable
Cumulative function (e.g. ``np.cumsum``)
binop : callable
Associative function (e.g. ``add``)
pre : np.array
The value calculated in parallel from ``preop``.
For example, the sum of all the previous blocks.
x : np.array
Current block
axis : int
dtype : dtype
Returns
-------
np.array
"""
# We could compute this in two tasks.
# This would allow us to do useful work (i.e., func), while waiting on `pre`.
# Using one task may guide the scheduler to do better and reduce scheduling overhead.
> return binop(pre, func(x, axis=axis, dtype=dtype))
E RuntimeWarning: invalid value encountered in multiply
dask/array/reductions.py:1145: RuntimeWarning
___________ test_array_cumreduction_axis[blelloch-None-True-cumprod] ___________
[gw2] linux -- Python 3.9.1 /usr/bin/python3
func = 'cumprod', use_nan = True, axis = None, method = 'blelloch'
@pytest.mark.parametrize("func", ["cumsum", "cumprod", "nancumsum", "nancumprod"])
@pytest.mark.parametrize("use_nan", [False, True])
@pytest.mark.parametrize("axis", [None, 0, 1, -1])
@pytest.mark.parametrize("method", ["sequential", "blelloch"])
def test_array_cumreduction_axis(func, use_nan, axis, method):
np_func = getattr(np, func)
da_func = getattr(da, func)
s = (10, 11, 12)
a = np.arange(np.prod(s), dtype=float).reshape(s)
if use_nan:
a[1] = np.nan
d = da.from_array(a, chunks=(4, 5, 6))
a_r = np_func(a, axis=axis)
d_r = da_func(d, axis=axis, method=method)
> assert_eq(a_r, d_r)
dask/array/tests/test_reductions.py:553:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
dask/array/tests/test_reductions.py:18: in assert_eq
_assert_eq(a, b, equal_nan=True)
dask/array/utils.py:259: in assert_eq
b, bdt, b_meta, b_computed = _get_dt_meta_computed(
dask/array/utils.py:234: in _get_dt_meta_computed
x = x.compute(scheduler="sync")
dask/base.py:279: in compute
(result,) = compute(self, traverse=False, **kwargs)
dask/base.py:561: in compute
results = schedule(dsk, keys, **kwargs)
dask/local.py:528: in get_sync
return get_async(apply_sync, 1, dsk, keys, **kwargs)
dask/local.py:495: in get_async
fire_task()
dask/local.py:457: in fire_task
apply_async(
dask/local.py:517: in apply_sync
res = func(*args, **kwds)
dask/local.py:227: in execute_task
result = pack_exception(e, dumps)
dask/local.py:222: in execute_task
result = _execute_task(task, data)
dask/core.py:121: in _execute_task
return func(*(_execute_task(a, cache) for a in args))
dask/optimization.py:963: in __call__
return core.get(self.dsk, self.outkey, dict(zip(self.inkeys, args)))
dask/core.py:151: in get
result = _execute_task(task, cache)
dask/core.py:121: in _execute_task
return func(*(_execute_task(a, cache) for a in args))
dask/utils.py:31: in apply
return func(*args, **kwargs)
<__array_function__ internals>:5: in prod
???
/usr/lib64/python3.9/site-packages/numpy/core/fromnumeric.py:3030: in prod
return _wrapreduction(a, np.multiply, 'prod', axis, dtype, out,
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
obj = array([264., 265., 266., 267., 268., 269., 270., 271., 272., 273., 274.,
275., 276., 277., 278., 279., 280., 28...7., 378., 379., 380., 381., 382., 383., 384.,
385., 386., 387., 388., 389., 390., 391., 392., 393., 394., 395.])
ufunc = <ufunc 'multiply'>, method = 'prod', axis = 0, dtype = None, out = None
kwargs = {'initial': <no value>, 'keepdims': True, 'where': <no value>}
passkwargs = {'keepdims': True}
def _wrapreduction(obj, ufunc, method, axis, dtype, out, **kwargs):
passkwargs = {k: v for k, v in kwargs.items()
if v is not np._NoValue}
if type(obj) is not mu.ndarray:
try:
reduction = getattr(obj, method)
except AttributeError:
pass
else:
# This branch is needed for reductions like any which don't
# support a dtype.
if dtype is not None:
return reduction(axis=axis, dtype=dtype, out=out, **passkwargs)
else:
return reduction(axis=axis, out=out, **passkwargs)
> return ufunc.reduce(obj, axis, dtype, out, **passkwargs)
E RuntimeWarning: overflow encountered in reduce
/usr/lib64/python3.9/site-packages/numpy/core/fromnumeric.py:87: RuntimeWarning
_________ test_array_cumreduction_axis[blelloch-None-True-nancumprod] __________
[gw2] linux -- Python 3.9.1 /usr/bin/python3
func = 'nancumprod', use_nan = True, axis = None, method = 'blelloch'
@pytest.mark.parametrize("func", ["cumsum", "cumprod", "nancumsum", "nancumprod"])
@pytest.mark.parametrize("use_nan", [False, True])
@pytest.mark.parametrize("axis", [None, 0, 1, -1])
@pytest.mark.parametrize("method", ["sequential", "blelloch"])
def test_array_cumreduction_axis(func, use_nan, axis, method):
np_func = getattr(np, func)
da_func = getattr(da, func)
s = (10, 11, 12)
a = np.arange(np.prod(s), dtype=float).reshape(s)
if use_nan:
a[1] = np.nan
d = da.from_array(a, chunks=(4, 5, 6))
a_r = np_func(a, axis=axis)
d_r = da_func(d, axis=axis, method=method)
> assert_eq(a_r, d_r)
dask/array/tests/test_reductions.py:553:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
dask/array/tests/test_reductions.py:18: in assert_eq
_assert_eq(a, b, equal_nan=True)
dask/array/utils.py:259: in assert_eq
b, bdt, b_meta, b_computed = _get_dt_meta_computed(
dask/array/utils.py:234: in _get_dt_meta_computed
x = x.compute(scheduler="sync")
dask/base.py:279: in compute
(result,) = compute(self, traverse=False, **kwargs)
dask/base.py:561: in compute
results = schedule(dsk, keys, **kwargs)
dask/local.py:528: in get_sync
return get_async(apply_sync, 1, dsk, keys, **kwargs)
dask/local.py:495: in get_async
fire_task()
dask/local.py:457: in fire_task
apply_async(
dask/local.py:517: in apply_sync
res = func(*args, **kwds)
dask/local.py:227: in execute_task
result = pack_exception(e, dumps)
dask/local.py:222: in execute_task
result = _execute_task(task, data)
dask/core.py:121: in _execute_task
return func(*(_execute_task(a, cache) for a in args))
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
func = <function nancumprod at 0x7f22206bb040>, binop = <built-in function mul>
pre = array([0.])
x = array([264., 265., 266., 267., 268., 269., 270., 271., 272., 273., 274.,
275., 276., 277., 278., 279., 280., 28...7., 378., 379., 380., 381., 382., 383., 384.,
385., 386., 387., 388., 389., 390., 391., 392., 393., 394., 395.])
axis = 0, dtype = dtype('float64')
def _prefixscan_combine(func, binop, pre, x, axis, dtype):
"""Combine results of a parallel prefix scan such as cumsum
Parameters
----------
func : callable
Cumulative function (e.g. ``np.cumsum``)
binop : callable
Associative function (e.g. ``add``)
pre : np.array
The value calculated in parallel from ``preop``.
For example, the sum of all the previous blocks.
x : np.array
Current block
axis : int
dtype : dtype
Returns
-------
np.array
"""
# We could compute this in two tasks.
# This would allow us to do useful work (i.e., func), while waiting on `pre`.
# Using one task may guide the scheduler to do better and reduce scheduling overhead.
> return binop(pre, func(x, axis=axis, dtype=dtype))
E RuntimeWarning: invalid value encountered in multiply
dask/array/reductions.py:1145: RuntimeWarningShould this test simply set NumPy to ignore invalid values? |
|
Ignoring the invalid/overflow warnings, I see that Dask is very wrong somehow. For |
|
I think the issue is that there are Consider: In [1]: s = (10, 11, 12)
...: a = np.arange(1, np.prod(s)+1, dtype=float).reshape(s)
In [2]: np.cumprod(a)
Out[2]: array([ 1., 2., 6., ..., inf, inf, inf])
In [3]: s = (10, 11, 12)
...: a = np.arange(1, np.prod(s)+1).reshape(s)
In [4]: np.cumprod(a)
Out[4]: array([1, 2, 6, ..., 0, 0, 0])In the case of this test I think it would probably be resolved by having smaller partitions or by having a smaller array. |
|
Oh and then this is exacerbated because we try to multiply the result of the previous partitions ( In [5]: np.cumprod(a) * np.array([0.])
<ipython-input-32-aa8604efc829>:1: RuntimeWarning: invalid value encountered in multiply
np.cumprod(a) * np.array([0.])
Out[5]: array([ 0., 0., 0., ..., nan, nan, nan]) |
|
In dask we are taking the |
|
But then I guess you are committed to computing linearly... |
|
Ok I put up a fix for the tests in #7089. I didn't want to just push it to your branch but here's the diff in case that's easier: diff --git a/dask/array/reductions.py b/dask/array/reductions.py
index dd2b9f04..908e6873 100644
--- a/dask/array/reductions.py
+++ b/dask/array/reductions.py
@@ -1194,7 +1194,7 @@ def prefixscan_blelloch(func, preop, binop, x, axis=None, dtype=None, out=None):
dask array
"""
if axis is None:
- x = x.flatten()
+ x = x.flatten().rechunk(chunks=x.npartitions)
axis = 0
if dtype is None:
dtype = getattr(func(np.empty((0,), dtype=x.dtype)), "dtype", object)
@@ -1340,7 +1340,7 @@ def cumreduction(
)
if axis is None:
- x = x.flatten()
+ x = x.flatten().rechunk(chunks=x.npartitions)
axis = 0
if dtype is None:
dtype = getattr(func(np.empty((0,), dtype=x.dtype)), "dtype", object)
diff --git a/dask/array/tests/test_reductions.py b/dask/array/tests/test_reductions.py
index a773a203..c166b863 100644
--- a/dask/array/tests/test_reductions.py
+++ b/dask/array/tests/test_reductions.py
@@ -546,6 +546,10 @@ def test_array_cumreduction_axis(func, use_nan, axis, method):
if use_nan:
a[1] = np.nan
d = da.from_array(a, chunks=(4, 5, 6))
+ if func in ["cumprod", "nancumprod"] and method == "blelloch" and axis is None:
+ with pytest.warns(RuntimeWarning):
+ da_func(d, axis=axis, method=method).compute()
+ return
a_r = np_func(a, axis=axis)
d_r = da_func(d, axis=axis, method=method) |
|
OK, so there are two bugs here; that test was relying on |
Or on the other hand, maybe that means the test was broken and not really checking what it intended to. |
I think that's the correct interpretation. My preference is to merge this PR with the partial fix from #7089 and write up a separate issue to fix cumprod. |
|
superseded by #7089 |
The NumPy 1.20.0 release candidate is already in Fedora Rawhide for testing, and 34 tests fail. This fixes most of them.
black dask/flake8 dask