-
-
Notifications
You must be signed in to change notification settings - Fork 747
Open
Description
I'm getting the following exception from a binary tree that I'm building from futures (based largely on an example in the dask docs that uses delayed instead of futures).
Here's the function in question:
from dask.distributed import default_client
from toolz import first
def tree_reduce(objs, func=sum):
while len(objs) > 1:
new_objs = []
n_objs = len(objs)
for i in range(0, n_objs, 2):
inputs = objs[i:i + 2]
obj = get_client().submit(func, inputs)
new_objs.append(obj)
wait(new_objs)
objs = new_objs
return first(objs)And my reproducible test:
n_parts = 15
client = Client(cluster)
a = client.scatter(range(n_parts))
b = tree_reduce(a)
b = b.result()
assert(sum(range(n_parts)) == b)The exception is intermittent and happens about 50% of the time:
self = <tornado.gen.Runner object at 0x7f2cf6ce5710>
def run(self) -> None:
"""Starts or resumes the generator, running until it reaches a
yield point that is not ready.
"""
if self.running or self.finished:
return
try:
self.running = True
while True:
future = self.future
if future is None:
raise Exception("No pending future")
if not future.done():
return
self.future = None
try:
exc_info = None
try:
> value = future.result()
E concurrent.futures._base.CancelledErrorI've tried doing a dask.distributed.wait after each level of the tree. I've also tried waiting for b in the reproducible example before calling result. I'm completely stumped as to why this is happening. I'm running this on a workstation with 2x V100s.
I believe I'm doing something very wrong but I can't figure out what it is.
kasince2k, JacobHayes, gczachor, kumarprabhu1988, parabolala and 7 more
Metadata
Metadata
Assignees
Labels
No labels