-
-
Notifications
You must be signed in to change notification settings - Fork 1.9k
Description
Trying to run tests on current master (3d0e7bd) on an Amazon Linux m4.10xlarge (40 cores), I see dask/array/tests/test_array_core.py::test_setitem_extended_API allocating OS threads up to 32k at which point I get one of a variety of possible errors from within threading or multiprocessing (and also potentially in subsequent tests that try to run, ultimately segfault'ing the Python process):
# Run test + send to background
pytest -v dask/array/tests/test_array_core.py::test_setitem_extended_API &>test-out.txt &
# Print the number of OS threads in use every 1s
while [ 1 ] ; do ps -fLu $USER | wc -l; sleep 1; doneOutput:
17
1823
5134
7714
10055
12960
14766
16744
18749
20395
21560
22893
24183
25602
25989
26634
27709
28655
29472
30375
30934
31442
32009
32176
32176
32176
32176
17810
[1]+ Exit 1 pytest -v dask/array/tests/test_array_core.py::test_setitem_extended_API &>test-out.txt
16
16
16
16
test-out.txt
============================= test session starts ==============================
platform linux -- Python 3.7.10, pytest-6.2.2, py-1.10.0, pluggy-0.13.1 -- /home/rwilliams/.pyenv/versions/3.7.10/bin/python3.7
cachedir: .pytest_cache
rootdir: /home/rwilliams/c/dask, configfile: setup.cfg
collecting ... collected 1 item
dask/array/tests/test_array_core.py::test_setitem_extended_API FAILED [100%]
=================================== FAILURES ===================================
__________________________ test_setitem_extended_API ___________________________
def test_setitem_extended_API():
x = np.ma.arange(60).reshape((6, 10))
dx = da.from_array(x.copy(), chunks=(2, 2))
x[:, 2] = range(6)
x[3, :] = range(10)
x[::2, ::-1] = -1
x[1::2] = -2
x[:, [3, 5, 6]] = -3
x[2:4, x[0] > 3] = -5
x[2, x[0] < -2] = -7
x[x % 2 == 0] = -8
x[[4, 3, 1]] = -9
x[5, ...] = -10
x[..., 4] = -11
x[2:4, 5:1:-2] = -x[:2, 4:1:-2]
x[:2, :3] = [[1, 2, 3]]
x[1, 1:7:2] = np.ma.masked
x[0, 1:3] = -x[0, 4:2:-1]
x[...] = x
x[...] = x[...]
x[0] = x[-1]
x[0, :] = x[-2, :]
x[:, 1] = x[:, -3]
x[[True, False, False, False, True, False], 2] = -4
x[3, [True, True, False, True, True, False, True, False, True, True]] = -5
x[
4,
da.from_array(
[False, False, True, True, False, False, True, False, False, True]
),
] = -55
x[np.array([False, False, True, True, False, False]), 5:7] = -66
dx[:, 2] = range(6)
dx[3, :] = range(10)
dx[::2, ::-1] = -1
dx[1::2] = -2
dx[:, [3, 5, 6]] = -3
dx[2:4, dx[0] > 3] = -5
dx[2, dx[0] < -2] = -7
dx[dx % 2 == 0] = -8
dx[[4, 3, 1]] = -9
dx[5, ...] = -10
dx[..., 4] = -11
dx[2:4, 5:1:-2] = -dx[:2, 4:1:-2]
dx[:2, :3] = [[1, 2, 3]]
dx[1, 1:7:2] = np.ma.masked
dx[0, 1:3] = -dx[0, 4:2:-1]
dx[...] = dx
dx[...] = dx[...]
dx[0, :] = dx[-2, :]
dx[:, 1] = dx[:, -3]
dx[[True, False, False, False, True, False], 2] = -4
dx[3, [True, True, False, True, True, False, True, False, True, True]] = -5
dx[
4,
da.from_array(
[False, False, True, True, False, False, True, False, False, True]
),
] = -55
dx[np.array([False, False, True, True, False, False]), 5:7] = -66
assert_eq(x, dx.compute())
> assert_eq(x.mask, da.ma.getmaskarray(dx))
dask/array/tests/test_array_core.py:3677:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
dask/array/utils.py:265: in assert_eq
b, check_shape=check_shape, check_graph=check_graph
dask/array/utils.py:239: in _get_dt_meta_computed
x = x.compute(scheduler="sync")
dask/base.py:281: in compute
(result,) = compute(self, traverse=False, **kwargs)
dask/base.py:563: in compute
results = schedule(dsk, keys, **kwargs)
dask/local.py:528: in get_sync
return get_async(apply_sync, 1, dsk, keys, **kwargs)
dask/local.py:495: in get_async
fire_task()
dask/local.py:467: in fire_task
callback=queue.put,
dask/local.py:517: in apply_sync
res = func(*args, **kwds)
dask/local.py:227: in execute_task
result = pack_exception(e, dumps)
dask/local.py:222: in execute_task
result = _execute_task(task, data)
dask/core.py:121: in _execute_task
return func(*(_execute_task(a, cache) for a in args))
dask/optimization.py:963: in __call__
return core.get(self.dsk, self.outkey, dict(zip(self.inkeys, args)))
dask/core.py:151: in get
result = _execute_task(task, cache)
dask/core.py:121: in _execute_task
return func(*(_execute_task(a, cache) for a in args))
dask/core.py:121: in <genexpr>
return func(*(_execute_task(a, cache) for a in args))
dask/core.py:121: in _execute_task
return func(*(_execute_task(a, cache) for a in args))
dask/core.py:121: in <genexpr>
return func(*(_execute_task(a, cache) for a in args))
dask/core.py:115: in _execute_task
return [_execute_task(a, cache) for a in arg]
dask/core.py:115: in <listcomp>
return [_execute_task(a, cache) for a in arg]
dask/core.py:121: in _execute_task
return func(*(_execute_task(a, cache) for a in args))
dask/core.py:121: in <genexpr>
return func(*(_execute_task(a, cache) for a in args))
dask/core.py:115: in _execute_task
return [_execute_task(a, cache) for a in arg]
dask/core.py:115: in <listcomp>
return [_execute_task(a, cache) for a in arg]
dask/core.py:121: in _execute_task
return func(*(_execute_task(a, cache) for a in args))
dask/core.py:121: in <genexpr>
return func(*(_execute_task(a, cache) for a in args))
dask/core.py:115: in _execute_task
return [_execute_task(a, cache) for a in arg]
dask/core.py:115: in <listcomp>
return [_execute_task(a, cache) for a in arg]
dask/core.py:121: in _execute_task
return func(*(_execute_task(a, cache) for a in args))
dask/core.py:121: in <genexpr>
return func(*(_execute_task(a, cache) for a in args))
dask/core.py:115: in _execute_task
return [_execute_task(a, cache) for a in arg]
dask/core.py:115: in <listcomp>
return [_execute_task(a, cache) for a in arg]
dask/core.py:121: in _execute_task
return func(*(_execute_task(a, cache) for a in args))
dask/core.py:121: in <genexpr>
return func(*(_execute_task(a, cache) for a in args))
dask/core.py:115: in _execute_task
return [_execute_task(a, cache) for a in arg]
dask/core.py:115: in <listcomp>
return [_execute_task(a, cache) for a in arg]
dask/core.py:121: in _execute_task
return func(*(_execute_task(a, cache) for a in args))
dask/utils.py:35: in apply
return func(*args, **kwargs)
dask/array/core.py:446: in _pass_extra_kwargs
return func(*args[len(keys) :], **kwargs)
dask/array/slicing.py:1729: in setitem
v = np.asanyarray(value[tuple(value_indices)])
../../.pyenv/versions/3.7.10/lib/python3.7/site-packages/numpy/core/_asarray.py:171: in asanyarray
return array(a, dtype, copy=False, order=order, subok=True)
dask/array/core.py:1454: in __array__
x = self.compute()
dask/base.py:281: in compute
(result,) = compute(self, traverse=False, **kwargs)
dask/base.py:563: in compute
results = schedule(dsk, keys, **kwargs)
dask/threaded.py:84: in get
**kwargs
dask/local.py:487: in get_async
raise_exception(exc, tb)
dask/local.py:317: in reraise
raise exc
dask/local.py:222: in execute_task
result = _execute_task(task, data)
dask/core.py:121: in _execute_task
return func(*(_execute_task(a, cache) for a in args))
dask/core.py:121: in <genexpr>
return func(*(_execute_task(a, cache) for a in args))
dask/core.py:121: in _execute_task
return func(*(_execute_task(a, cache) for a in args))
dask/optimization.py:963: in __call__
return core.get(self.dsk, self.outkey, dict(zip(self.inkeys, args)))
dask/core.py:151: in get
result = _execute_task(task, cache)
dask/core.py:121: in _execute_task
return func(*(_execute_task(a, cache) for a in args))
dask/utils.py:35: in apply
return func(*args, **kwargs)
dask/array/core.py:446: in _pass_extra_kwargs
return func(*args[len(keys) :], **kwargs)
dask/array/slicing.py:1729: in setitem
v = np.asanyarray(value[tuple(value_indices)])
../../.pyenv/versions/3.7.10/lib/python3.7/site-packages/numpy/core/_asarray.py:171: in asanyarray
return array(a, dtype, copy=False, order=order, subok=True)
dask/array/core.py:1454: in __array__
x = self.compute()
dask/base.py:281: in compute
(result,) = compute(self, traverse=False, **kwargs)
dask/base.py:563: in compute
results = schedule(dsk, keys, **kwargs)
dask/threaded.py:84: in get
**kwargs
dask/local.py:487: in get_async
raise_exception(exc, tb)
dask/local.py:317: in reraise
raise exc
dask/local.py:222: in execute_task
result = _execute_task(task, data)
dask/core.py:121: in _execute_task
return func(*(_execute_task(a, cache) for a in args))
dask/optimization.py:963: in __call__
return core.get(self.dsk, self.outkey, dict(zip(self.inkeys, args)))
dask/core.py:151: in get
result = _execute_task(task, cache)
dask/core.py:121: in _execute_task
return func(*(_execute_task(a, cache) for a in args))
dask/utils.py:35: in apply
return func(*args, **kwargs)
dask/array/core.py:446: in _pass_extra_kwargs
return func(*args[len(keys) :], **kwargs)
dask/array/slicing.py:1729: in setitem
v = np.asanyarray(value[tuple(value_indices)])
../../.pyenv/versions/3.7.10/lib/python3.7/site-packages/numpy/core/_asarray.py:171: in asanyarray
return array(a, dtype, copy=False, order=order, subok=True)
dask/array/core.py:1454: in __array__
x = self.compute()
dask/base.py:281: in compute
(result,) = compute(self, traverse=False, **kwargs)
dask/base.py:563: in compute
results = schedule(dsk, keys, **kwargs)
dask/threaded.py:84: in get
**kwargs
dask/local.py:487: in get_async
raise_exception(exc, tb)
dask/local.py:317: in reraise
raise exc
dask/local.py:222: in execute_task
result = _execute_task(task, data)
dask/core.py:121: in _execute_task
return func(*(_execute_task(a, cache) for a in args))
dask/optimization.py:963: in __call__
return core.get(self.dsk, self.outkey, dict(zip(self.inkeys, args)))
dask/core.py:151: in get
result = _execute_task(task, cache)
dask/core.py:121: in _execute_task
return func(*(_execute_task(a, cache) for a in args))
dask/core.py:121: in <genexpr>
return func(*(_execute_task(a, cache) for a in args))
dask/core.py:115: in _execute_task
return [_execute_task(a, cache) for a in arg]
dask/core.py:115: in <listcomp>
return [_execute_task(a, cache) for a in arg]
dask/core.py:121: in _execute_task
return func(*(_execute_task(a, cache) for a in args))
dask/core.py:121: in <genexpr>
return func(*(_execute_task(a, cache) for a in args))
dask/core.py:115: in _execute_task
return [_execute_task(a, cache) for a in arg]
dask/core.py:115: in <listcomp>
return [_execute_task(a, cache) for a in arg]
dask/core.py:121: in _execute_task
return func(*(_execute_task(a, cache) for a in args))
dask/core.py:121: in <genexpr>
return func(*(_execute_task(a, cache) for a in args))
dask/core.py:115: in _execute_task
return [_execute_task(a, cache) for a in arg]
dask/core.py:115: in <listcomp>
return [_execute_task(a, cache) for a in arg]
dask/core.py:121: in _execute_task
return func(*(_execute_task(a, cache) for a in args))
dask/core.py:121: in <genexpr>
return func(*(_execute_task(a, cache) for a in args))
dask/core.py:115: in _execute_task
return [_execute_task(a, cache) for a in arg]
dask/core.py:115: in <listcomp>
return [_execute_task(a, cache) for a in arg]
dask/core.py:121: in _execute_task
return func(*(_execute_task(a, cache) for a in args))
dask/utils.py:35: in apply
return func(*args, **kwargs)
dask/array/core.py:446: in _pass_extra_kwargs
return func(*args[len(keys) :], **kwargs)
dask/array/slicing.py:1729: in setitem
v = np.asanyarray(value[tuple(value_indices)])
../../.pyenv/versions/3.7.10/lib/python3.7/site-packages/numpy/core/_asarray.py:171: in asanyarray
return array(a, dtype, copy=False, order=order, subok=True)
dask/array/core.py:1454: in __array__
x = self.compute()
dask/base.py:281: in compute
(result,) = compute(self, traverse=False, **kwargs)
dask/base.py:563: in compute
results = schedule(dsk, keys, **kwargs)
dask/threaded.py:72: in get
pool = ThreadPool(num_workers)
../../.pyenv/versions/3.7.10/lib/python3.7/multiprocessing/pool.py:802: in __init__
Pool.__init__(self, processes, initializer, initargs)
../../.pyenv/versions/3.7.10/lib/python3.7/multiprocessing/pool.py:176: in __init__
self._repopulate_pool()
../../.pyenv/versions/3.7.10/lib/python3.7/multiprocessing/pool.py:241: in _repopulate_pool
w.start()
../../.pyenv/versions/3.7.10/lib/python3.7/multiprocessing/dummy/__init__.py:51: in start
threading.Thread.start(self)
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
self = <DummyProcess(Thread-32160, initial daemon)>
def start(self):
"""Start the thread's activity.
It must be called at most once per thread object. It arranges for the
object's run() method to be invoked in a separate thread of control.
This method will raise a RuntimeError if called more than once on the
same thread object.
"""
if not self._initialized:
raise RuntimeError("thread.__init__() not called")
if self._started.is_set():
raise RuntimeError("threads can only be started once")
with _active_limbo_lock:
_limbo[self] = self
try:
> _start_new_thread(self._bootstrap, ())
E RuntimeError: can't start new thread
../../.pyenv/versions/3.7.10/lib/python3.7/threading.py:852: RuntimeError
============================= slowest 10 durations =============================
45.06s call dask/array/tests/test_array_core.py::test_setitem_extended_API
0.01s teardown dask/array/tests/test_array_core.py::test_setitem_extended_API
(1 durations < 0.005s hidden. Use -vv to show these durations.)
============================== 1 failed in 55.67s ==============================
This test seems to work fine on my 8-core macbook (and in GHA, presumably). I'm wondering if there are relevant system libraries that should be checked?
I repro'd the error on 3.7.10, 3.8.6, and 3.8.8 (the error msgs were sometimes different on each one).
The test was added in #7033, so perhaps it should just be @ignored while we debug? I am going to do that locally at least.
- Dask version: ≥ 3f7aa3f0 / Extend __setitem__ to more closely match numpy #7033
- Python version: 3.7.10, 3.8.6, 3.8.8
- Operating System: Amazon Linux 2 (
centos rhel fedora) - Install method (conda, pip, source):
pip install -e .[complete] && pip install pytest
FWIW, here are my ulimits:
ulimit -acore file size (blocks, -c) 0
data seg size (kbytes, -d) unlimited
scheduling priority (-e) 0
file size (blocks, -f) unlimited
pending signals (-i) 644539
max locked memory (kbytes, -l) unlimited
max memory size (kbytes, -m) unlimited
open files (-n) 8192
pipe size (512 bytes, -p) 8
POSIX message queues (bytes, -q) 819200
real-time priority (-r) 0
stack size (kbytes, -s) 8192
cpu time (seconds, -t) unlimited
max user processes (-u) 644539
virtual memory (kbytes, -v) unlimited
file locks (-x) unlimited
None of them seem to exactly match the ≈2^15 where I see the error happen, but the exploding thread counts + 32k ceiling seem pretty consistent.
cc @davidhassell for visibility