-
-
Notifications
You must be signed in to change notification settings - Fork 757
Open
Description
distributed.default_client() calls distributed.client._get_global_client which copies _global_clients in a thread unsafe way
__________________________________ test_race ___________________________________
c = <Client: No scheduler connected>
s = <Scheduler 'tcp://127.0.0.1:43621', workers: 0, cores: 0, tasks: 2>
workers = (<Nanny: None, threads: 2>, <Nanny: None, threads: 2>, <Nanny: None, threads: 2>, <Nanny: None, threads: 2>, <Nanny: None, threads: 2>)
f = <function test_race.<locals>.f at 0x7fa98979c9d0>
v = <distributed.variable.Variable object at 0x7fa979f7e850>
x = <Future: cancelled, type: int, key: int-c0a8a20f903a4915b94db8de3ea63195>
@pytest.mark.slow
@gen_cluster(client=True, nthreads=[("127.0.0.1", 2)] * 5, Worker=Nanny, timeout=60)
async def test_race(c, s, *workers):
NITERS = 50
def f(i):
with worker_client() as c:
v = Variable("x", client=c)
for _ in range(NITERS):
future = v.get()
x = future.result()
y = c.submit(inc, x)
v.set(y)
sleep(0.01 * random.random())
result = v.get().result()
sleep(0.1) # allow fire-and-forget messages to clear
return result
v = Variable("x", client=c)
x = await c.scatter(1)
await v.set(x)
futures = c.map(f, range(15))
> results = await c.gather(futures)
distributed/tests/test_variable.py:215:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
distributed/client.py:2007: in _gather
raise exception.with_traceback(traceback)
distributed/tests/test_variable.py:198: in f
with worker_client() as c:
/usr/share/miniconda3/envs/dask-distributed/lib/python3.9/contextlib.py:119: in __enter__
return next(self.gen)
distributed/worker_client.py:53: in worker_client
client = get_client(timeout=timeout)
distributed/worker.py:4244: in get_client
return worker._get_client(timeout=timeout)
distributed/worker.py:4097: in _get_client
client = default_client()
distributed/client.py:5121: in default_client
c = c or _get_global_client()
distributed/client.py:120: in _get_global_client
L = sorted(list(_global_clients), reverse=True)
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
> for k, wr in self.data.items():
E RuntimeError: dictionary changed size during iteration
/usr/share/miniconda3/envs/dask-distributed/lib/python3.9/weakref.py:223: RuntimeError
Reactions are currently unavailable
Metadata
Metadata
Assignees
Labels
No labels