Skip to content

Reintroduce __setitem__ which more closely matches NumPy #7261

@ryan-williams

Description

@ryan-williams

Trying to run tests on current master (3d0e7bd) on an Amazon Linux m4.10xlarge (40 cores), I see dask/array/tests/test_array_core.py::test_setitem_extended_API allocating OS threads up to 32k at which point I get one of a variety of possible errors from within threading or multiprocessing (and also potentially in subsequent tests that try to run, ultimately segfault'ing the Python process):

# Run test + send to background
pytest -v dask/array/tests/test_array_core.py::test_setitem_extended_API &>test-out.txt &

# Print the number of OS threads in use every 1s
while [ 1 ] ; do ps -fLu $USER | wc -l; sleep 1; done

Output:

17
1823
5134
7714
10055
12960
14766
16744
18749
20395
21560
22893
24183
25602
25989
26634
27709
28655
29472
30375
30934
31442
32009
32176
32176
32176
32176
17810
[1]+  Exit 1                  pytest -v dask/array/tests/test_array_core.py::test_setitem_extended_API &>test-out.txt
16
16
16
16
test-out.txt
============================= test session starts ==============================
platform linux -- Python 3.7.10, pytest-6.2.2, py-1.10.0, pluggy-0.13.1 -- /home/rwilliams/.pyenv/versions/3.7.10/bin/python3.7
cachedir: .pytest_cache
rootdir: /home/rwilliams/c/dask, configfile: setup.cfg
collecting ... collected 1 item

dask/array/tests/test_array_core.py::test_setitem_extended_API FAILED    [100%]

=================================== FAILURES ===================================
__________________________ test_setitem_extended_API ___________________________

    def test_setitem_extended_API():
        x = np.ma.arange(60).reshape((6, 10))
        dx = da.from_array(x.copy(), chunks=(2, 2))

        x[:, 2] = range(6)
        x[3, :] = range(10)
        x[::2, ::-1] = -1
        x[1::2] = -2
        x[:, [3, 5, 6]] = -3
        x[2:4, x[0] > 3] = -5
        x[2, x[0] < -2] = -7
        x[x % 2 == 0] = -8
        x[[4, 3, 1]] = -9
        x[5, ...] = -10
        x[..., 4] = -11
        x[2:4, 5:1:-2] = -x[:2, 4:1:-2]
        x[:2, :3] = [[1, 2, 3]]
        x[1, 1:7:2] = np.ma.masked
        x[0, 1:3] = -x[0, 4:2:-1]
        x[...] = x
        x[...] = x[...]
        x[0] = x[-1]
        x[0, :] = x[-2, :]
        x[:, 1] = x[:, -3]
        x[[True, False, False, False, True, False], 2] = -4
        x[3, [True, True, False, True, True, False, True, False, True, True]] = -5
        x[
            4,
            da.from_array(
                [False, False, True, True, False, False, True, False, False, True]
            ),
        ] = -55
        x[np.array([False, False, True, True, False, False]), 5:7] = -66

        dx[:, 2] = range(6)
        dx[3, :] = range(10)
        dx[::2, ::-1] = -1
        dx[1::2] = -2
        dx[:, [3, 5, 6]] = -3
        dx[2:4, dx[0] > 3] = -5
        dx[2, dx[0] < -2] = -7
        dx[dx % 2 == 0] = -8
        dx[[4, 3, 1]] = -9
        dx[5, ...] = -10
        dx[..., 4] = -11
        dx[2:4, 5:1:-2] = -dx[:2, 4:1:-2]
        dx[:2, :3] = [[1, 2, 3]]
        dx[1, 1:7:2] = np.ma.masked
        dx[0, 1:3] = -dx[0, 4:2:-1]
        dx[...] = dx
        dx[...] = dx[...]
        dx[0, :] = dx[-2, :]
        dx[:, 1] = dx[:, -3]
        dx[[True, False, False, False, True, False], 2] = -4
        dx[3, [True, True, False, True, True, False, True, False, True, True]] = -5
        dx[
            4,
            da.from_array(
                [False, False, True, True, False, False, True, False, False, True]
            ),
        ] = -55
        dx[np.array([False, False, True, True, False, False]), 5:7] = -66

        assert_eq(x, dx.compute())
>       assert_eq(x.mask, da.ma.getmaskarray(dx))

dask/array/tests/test_array_core.py:3677:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
dask/array/utils.py:265: in assert_eq
    b, check_shape=check_shape, check_graph=check_graph
dask/array/utils.py:239: in _get_dt_meta_computed
    x = x.compute(scheduler="sync")
dask/base.py:281: in compute
    (result,) = compute(self, traverse=False, **kwargs)
dask/base.py:563: in compute
    results = schedule(dsk, keys, **kwargs)
dask/local.py:528: in get_sync
    return get_async(apply_sync, 1, dsk, keys, **kwargs)
dask/local.py:495: in get_async
    fire_task()
dask/local.py:467: in fire_task
    callback=queue.put,
dask/local.py:517: in apply_sync
    res = func(*args, **kwds)
dask/local.py:227: in execute_task
    result = pack_exception(e, dumps)
dask/local.py:222: in execute_task
    result = _execute_task(task, data)
dask/core.py:121: in _execute_task
    return func(*(_execute_task(a, cache) for a in args))
dask/optimization.py:963: in __call__
    return core.get(self.dsk, self.outkey, dict(zip(self.inkeys, args)))
dask/core.py:151: in get
    result = _execute_task(task, cache)
dask/core.py:121: in _execute_task
    return func(*(_execute_task(a, cache) for a in args))
dask/core.py:121: in <genexpr>
    return func(*(_execute_task(a, cache) for a in args))
dask/core.py:121: in _execute_task
    return func(*(_execute_task(a, cache) for a in args))
dask/core.py:121: in <genexpr>
    return func(*(_execute_task(a, cache) for a in args))
dask/core.py:115: in _execute_task
    return [_execute_task(a, cache) for a in arg]
dask/core.py:115: in <listcomp>
    return [_execute_task(a, cache) for a in arg]
dask/core.py:121: in _execute_task
    return func(*(_execute_task(a, cache) for a in args))
dask/core.py:121: in <genexpr>
    return func(*(_execute_task(a, cache) for a in args))
dask/core.py:115: in _execute_task
    return [_execute_task(a, cache) for a in arg]
dask/core.py:115: in <listcomp>
    return [_execute_task(a, cache) for a in arg]
dask/core.py:121: in _execute_task
    return func(*(_execute_task(a, cache) for a in args))
dask/core.py:121: in <genexpr>
    return func(*(_execute_task(a, cache) for a in args))
dask/core.py:115: in _execute_task
    return [_execute_task(a, cache) for a in arg]
dask/core.py:115: in <listcomp>
    return [_execute_task(a, cache) for a in arg]
dask/core.py:121: in _execute_task
    return func(*(_execute_task(a, cache) for a in args))
dask/core.py:121: in <genexpr>
    return func(*(_execute_task(a, cache) for a in args))
dask/core.py:115: in _execute_task
    return [_execute_task(a, cache) for a in arg]
dask/core.py:115: in <listcomp>
    return [_execute_task(a, cache) for a in arg]
dask/core.py:121: in _execute_task
    return func(*(_execute_task(a, cache) for a in args))
dask/core.py:121: in <genexpr>
    return func(*(_execute_task(a, cache) for a in args))
dask/core.py:115: in _execute_task
    return [_execute_task(a, cache) for a in arg]
dask/core.py:115: in <listcomp>
    return [_execute_task(a, cache) for a in arg]
dask/core.py:121: in _execute_task
    return func(*(_execute_task(a, cache) for a in args))
dask/utils.py:35: in apply
    return func(*args, **kwargs)
dask/array/core.py:446: in _pass_extra_kwargs
    return func(*args[len(keys) :], **kwargs)
dask/array/slicing.py:1729: in setitem
    v = np.asanyarray(value[tuple(value_indices)])
../../.pyenv/versions/3.7.10/lib/python3.7/site-packages/numpy/core/_asarray.py:171: in asanyarray
    return array(a, dtype, copy=False, order=order, subok=True)
dask/array/core.py:1454: in __array__
    x = self.compute()
dask/base.py:281: in compute
    (result,) = compute(self, traverse=False, **kwargs)
dask/base.py:563: in compute
    results = schedule(dsk, keys, **kwargs)
dask/threaded.py:84: in get
    **kwargs
dask/local.py:487: in get_async
    raise_exception(exc, tb)
dask/local.py:317: in reraise
    raise exc
dask/local.py:222: in execute_task
    result = _execute_task(task, data)
dask/core.py:121: in _execute_task
    return func(*(_execute_task(a, cache) for a in args))
dask/core.py:121: in <genexpr>
    return func(*(_execute_task(a, cache) for a in args))
dask/core.py:121: in _execute_task
    return func(*(_execute_task(a, cache) for a in args))
dask/optimization.py:963: in __call__
    return core.get(self.dsk, self.outkey, dict(zip(self.inkeys, args)))
dask/core.py:151: in get
    result = _execute_task(task, cache)
dask/core.py:121: in _execute_task
    return func(*(_execute_task(a, cache) for a in args))
dask/utils.py:35: in apply
    return func(*args, **kwargs)
dask/array/core.py:446: in _pass_extra_kwargs
    return func(*args[len(keys) :], **kwargs)
dask/array/slicing.py:1729: in setitem
    v = np.asanyarray(value[tuple(value_indices)])
../../.pyenv/versions/3.7.10/lib/python3.7/site-packages/numpy/core/_asarray.py:171: in asanyarray
    return array(a, dtype, copy=False, order=order, subok=True)
dask/array/core.py:1454: in __array__
    x = self.compute()
dask/base.py:281: in compute
    (result,) = compute(self, traverse=False, **kwargs)
dask/base.py:563: in compute
    results = schedule(dsk, keys, **kwargs)
dask/threaded.py:84: in get
    **kwargs
dask/local.py:487: in get_async
    raise_exception(exc, tb)
dask/local.py:317: in reraise
    raise exc
dask/local.py:222: in execute_task
    result = _execute_task(task, data)
dask/core.py:121: in _execute_task
    return func(*(_execute_task(a, cache) for a in args))
dask/optimization.py:963: in __call__
    return core.get(self.dsk, self.outkey, dict(zip(self.inkeys, args)))
dask/core.py:151: in get
    result = _execute_task(task, cache)
dask/core.py:121: in _execute_task
    return func(*(_execute_task(a, cache) for a in args))
dask/utils.py:35: in apply
    return func(*args, **kwargs)
dask/array/core.py:446: in _pass_extra_kwargs
    return func(*args[len(keys) :], **kwargs)
dask/array/slicing.py:1729: in setitem
    v = np.asanyarray(value[tuple(value_indices)])
../../.pyenv/versions/3.7.10/lib/python3.7/site-packages/numpy/core/_asarray.py:171: in asanyarray
    return array(a, dtype, copy=False, order=order, subok=True)
dask/array/core.py:1454: in __array__
    x = self.compute()
dask/base.py:281: in compute
    (result,) = compute(self, traverse=False, **kwargs)
dask/base.py:563: in compute
    results = schedule(dsk, keys, **kwargs)
dask/threaded.py:84: in get
    **kwargs
dask/local.py:487: in get_async
    raise_exception(exc, tb)
dask/local.py:317: in reraise
    raise exc
dask/local.py:222: in execute_task
    result = _execute_task(task, data)
dask/core.py:121: in _execute_task
    return func(*(_execute_task(a, cache) for a in args))
dask/optimization.py:963: in __call__
    return core.get(self.dsk, self.outkey, dict(zip(self.inkeys, args)))
dask/core.py:151: in get
    result = _execute_task(task, cache)
dask/core.py:121: in _execute_task
    return func(*(_execute_task(a, cache) for a in args))
dask/core.py:121: in <genexpr>
    return func(*(_execute_task(a, cache) for a in args))
dask/core.py:115: in _execute_task
    return [_execute_task(a, cache) for a in arg]
dask/core.py:115: in <listcomp>
    return [_execute_task(a, cache) for a in arg]
dask/core.py:121: in _execute_task
    return func(*(_execute_task(a, cache) for a in args))
dask/core.py:121: in <genexpr>
    return func(*(_execute_task(a, cache) for a in args))
dask/core.py:115: in _execute_task
    return [_execute_task(a, cache) for a in arg]
dask/core.py:115: in <listcomp>
    return [_execute_task(a, cache) for a in arg]
dask/core.py:121: in _execute_task
    return func(*(_execute_task(a, cache) for a in args))
dask/core.py:121: in <genexpr>
    return func(*(_execute_task(a, cache) for a in args))
dask/core.py:115: in _execute_task
    return [_execute_task(a, cache) for a in arg]
dask/core.py:115: in <listcomp>
    return [_execute_task(a, cache) for a in arg]
dask/core.py:121: in _execute_task
    return func(*(_execute_task(a, cache) for a in args))
dask/core.py:121: in <genexpr>
    return func(*(_execute_task(a, cache) for a in args))
dask/core.py:115: in _execute_task
    return [_execute_task(a, cache) for a in arg]
dask/core.py:115: in <listcomp>
    return [_execute_task(a, cache) for a in arg]
dask/core.py:121: in _execute_task
    return func(*(_execute_task(a, cache) for a in args))
dask/utils.py:35: in apply
    return func(*args, **kwargs)
dask/array/core.py:446: in _pass_extra_kwargs
    return func(*args[len(keys) :], **kwargs)
dask/array/slicing.py:1729: in setitem
    v = np.asanyarray(value[tuple(value_indices)])
../../.pyenv/versions/3.7.10/lib/python3.7/site-packages/numpy/core/_asarray.py:171: in asanyarray
    return array(a, dtype, copy=False, order=order, subok=True)
dask/array/core.py:1454: in __array__
    x = self.compute()
dask/base.py:281: in compute
    (result,) = compute(self, traverse=False, **kwargs)
dask/base.py:563: in compute
    results = schedule(dsk, keys, **kwargs)
dask/threaded.py:72: in get
    pool = ThreadPool(num_workers)
../../.pyenv/versions/3.7.10/lib/python3.7/multiprocessing/pool.py:802: in __init__
    Pool.__init__(self, processes, initializer, initargs)
../../.pyenv/versions/3.7.10/lib/python3.7/multiprocessing/pool.py:176: in __init__
    self._repopulate_pool()
../../.pyenv/versions/3.7.10/lib/python3.7/multiprocessing/pool.py:241: in _repopulate_pool
    w.start()
../../.pyenv/versions/3.7.10/lib/python3.7/multiprocessing/dummy/__init__.py:51: in start
    threading.Thread.start(self)
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _

self = <DummyProcess(Thread-32160, initial daemon)>

    def start(self):
        """Start the thread's activity.

        It must be called at most once per thread object. It arranges for the
        object's run() method to be invoked in a separate thread of control.

        This method will raise a RuntimeError if called more than once on the
        same thread object.

        """
        if not self._initialized:
            raise RuntimeError("thread.__init__() not called")

        if self._started.is_set():
            raise RuntimeError("threads can only be started once")
        with _active_limbo_lock:
            _limbo[self] = self
        try:
>           _start_new_thread(self._bootstrap, ())
E           RuntimeError: can't start new thread

../../.pyenv/versions/3.7.10/lib/python3.7/threading.py:852: RuntimeError
============================= slowest 10 durations =============================
45.06s call     dask/array/tests/test_array_core.py::test_setitem_extended_API
0.01s teardown dask/array/tests/test_array_core.py::test_setitem_extended_API

(1 durations < 0.005s hidden.  Use -vv to show these durations.)
============================== 1 failed in 55.67s ==============================

This test seems to work fine on my 8-core macbook (and in GHA, presumably). I'm wondering if there are relevant system libraries that should be checked?

I repro'd the error on 3.7.10, 3.8.6, and 3.8.8 (the error msgs were sometimes different on each one).

The test was added in #7033, so perhaps it should just be @ignored while we debug? I am going to do that locally at least.

FWIW, here are my ulimits:

ulimit -a
core file size          (blocks, -c) 0
data seg size           (kbytes, -d) unlimited
scheduling priority             (-e) 0
file size               (blocks, -f) unlimited
pending signals                 (-i) 644539
max locked memory       (kbytes, -l) unlimited
max memory size         (kbytes, -m) unlimited
open files                      (-n) 8192
pipe size            (512 bytes, -p) 8
POSIX message queues     (bytes, -q) 819200
real-time priority              (-r) 0
stack size              (kbytes, -s) 8192
cpu time               (seconds, -t) unlimited
max user processes              (-u) 644539
virtual memory          (kbytes, -v) unlimited
file locks                      (-x) unlimited

None of them seem to exactly match the ≈2^15 where I see the error happen, but the exploding thread counts + 32k ceiling seem pretty consistent.

cc @davidhassell for visibility

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions