-
-
Notifications
You must be signed in to change notification settings - Fork 757
Closed
Description
For some reason the lock appears to be locked elsewhere. This only happens when I have two threads and two processes, restricting usage with resources:
from dask.distributed import Client
import time
from dask.utils import SerializableLock
def dummy(i: int) -> int:
return i
def constrained(lock: SerializableLock, dummy: int):
assert not lock.locked()
with lock:
time.sleep(0.005)
assert not lock.locked()
N_TASKS = 1000
lock = SerializableLock()
futures = []
if __name__ == "__main__":
with Client(resources={"foo": 1}, n_workers=2, threads_per_worker=2) as client:
for i in range(N_TASKS):
x = client.submit(dummy, i)
futures.append(client.submit(constrained, lock, x, resources={"foo": 1}))
client.gather(futures)Things that might be wrong:
- Resources are bad? Should check to see what other tasks are running when this failure occurs
- Accidentally shared state between the two processes? Should try running this maybe with
dask-workerprocesses created separately
Reactions are currently unavailable
Metadata
Metadata
Assignees
Labels
No labels