Initial pass at blockwise array creation routines.#6931
Initial pass at blockwise array creation routines.#6931jrbourbeau merged 12 commits intodask:masterfrom
Conversation
|
Thanks for working on this @ian-r-rose! cc @rjzamora |
dask/array/blockwise.py
Outdated
| # out_ind = tuple(range(len(shape))) | ||
| out_ind = string.ascii_lowercase[: len(shape)] # mayday |
There was a problem hiding this comment.
If I could get your eyes on this @rjzamora, I'd appreciate it. Based on my reading of the docs, the output_indices should be able to be either a character string "ijk", or a tuple (1, 2, 3). Since with an ndarray we don't a priori know the number of dimensions, my intention was to do the latter. However, I ran into some issues with it that I didn't see with the string-based approach.
It seems that the difference lies here:
Line 546 in 2a7ab52
which strikes me as odd. I don't think this particular code path is exercised much, might it not be working as intended, or do I misunderstand what valid inputs should be here?
There was a problem hiding this comment.
If I use the tuple line, I get
import dask.array
dask.array.ones((10,10), chunks=(1,1)).compute()Details
---------------------------------------------------------------------------
TypeError Traceback (most recent call last)
<ipython-input-1-ac15502bbc95> in <module>
1 import dask.array
2
----> 3 dask.array.ones((10,10), chunks=(1,1)).compute()
~/dask/dask/dask/base.py in compute(self, **kwargs)
278 dask.base.compute
279 """
--> 280 (result,) = compute(self, traverse=False, **kwargs)
281 return result
282
~/dask/dask/dask/base.py in compute(*args, **kwargs)
566 postcomputes.append(x.__dask_postcompute__())
567
--> 568 results = schedule(dsk, keys, **kwargs)
569 return repack([f(r, *a) for r, (f, a) in zip(results, postcomputes)])
570
~/dask/dask/dask/threaded.py in get(dsk, result, cache, num_workers, pool, **kwargs)
74 pools[thread][num_workers] = pool
75
---> 76 results = get_async(
77 pool.apply_async,
78 len(pool._pool),
~/dask/dask/dask/local.py in get_async(apply_async, num_workers, dsk, result, cache, get_id, rerun_exceptions_locally, pack_exception, raise_exception, callbacks, dumps, loads, **kwargs)
484 _execute_task(task, data) # Re-execute locally
485 else:
--> 486 raise_exception(exc, tb)
487 res, worker_id = loads(res_info)
488 state["cache"][key] = res
~/dask/dask/dask/local.py in reraise(exc, tb)
314 if exc.__traceback__ is not tb:
315 raise exc.with_traceback(tb)
--> 316 raise exc
317
318
~/dask/dask/dask/local.py in execute_task(key, task_info, dumps, loads, get_id, pack_exception)
220 try:
221 task, data = loads(task_info)
--> 222 result = _execute_task(task, data)
223 id = get_id()
224 result = dumps((result, id))
~/dask/dask/dask/core.py in _execute_task(arg, cache, dsk)
119 # temporaries by their reference count and can execute certain
120 # operations in-place.
--> 121 return func(*(_execute_task(a, cache) for a in args))
122 elif not ishashable(arg):
123 return arg
~/dask/dask/dask/optimization.py in __call__(self, *args)
961 if not len(args) == len(self.inkeys):
962 raise ValueError("Expected %d args, got %d" % (len(self.inkeys), len(args)))
--> 963 return core.get(self.dsk, self.outkey, dict(zip(self.inkeys, args)))
964
965 def __reduce__(self):
~/dask/dask/dask/core.py in get(dsk, out, cache)
149 for key in toposort(dsk):
150 task = dsk[key]
--> 151 result = _execute_task(task, cache)
152 cache[key] = result
153 result = _execute_task(out, cache)
~/dask/dask/dask/core.py in _execute_task(arg, cache, dsk)
119 # temporaries by their reference count and can execute certain
120 # operations in-place.
--> 121 return func(*(_execute_task(a, cache) for a in args))
122 elif not ishashable(arg):
123 return arg
TypeError: __call__() takes 2 positional arguments but 3 were givenThere was a problem hiding this comment.
which strikes me as odd. I don't think this particular code path is exercised much, might it not be working as intended, or do I misunderstand what valid inputs should be here?
You are right to question this :)
This is a new piece of logic that has only been tested for the few DataFrame-focused cases where BlockwiseIO is used. It certainly seems that we need to consider the case that output_indices is a tuple - Sorry about that!
EDIT: Since the original code was targeting DataFrames, I may have mistaken the desired structure of dsk - I'll have the think about this.
There was a problem hiding this comment.
Thanks for pointing this out - If I understand correctly, there is no reason to use the dimensions here to define dsk. For, BlockwiseIO there should always be a single "dummy" variable (which will be replaced with the packed args for the IO function at run time). I submitted #6933 with the simple fix.
5d162a4 to
ecb2f58
Compare
|
@rjzamora I've cherry-picked your commit from #6933 -- it seems to fix the immediate issue, though there may be some downstream things to fix up. The following snippet adding a new axis (adapted from a test) fails for what appears to be dimensionality-related reasons: import dask.array as da
def f(x):
return x[:, None] * np.ones((1, 7))
x = da.ones(5, chunks=2)
y = da.blockwise(
f, "aq", x, "a", new_axes={"q": 7}, concatenate=True, dtype=x.dtype
)
y.compute()Details---------------------------------------------------------------------------
KeyError Traceback (most recent call last)
<ipython-input-1-d61b664d7769> in <module>
8 f, "aq", x, "a", new_axes={"q": 7}, concatenate=True, dtype=x.dtype
9 )
---> 10 y.compute()
~/dask/dask/dask/base.py in compute(self, **kwargs)
278 dask.base.compute
279 """
--> 280 (result,) = compute(self, traverse=False, **kwargs)
281 return result
282
~/dask/dask/dask/base.py in compute(*args, **kwargs)
560 )
561
--> 562 dsk = collections_to_dsk(collections, optimize_graph, **kwargs)
563 keys, postcomputes = [], []
564 for x in collections:
~/dask/dask/dask/base.py in collections_to_dsk(collections, optimize_graph, **kwargs)
331 dsk, keys = _extract_graph_and_keys(val)
332 groups[opt] = (dsk, keys)
--> 333 _opt = opt(dsk, keys, **kwargs)
334 _opt_list.append(_opt)
335
~/dask/dask/dask/array/optimization.py in optimize(dsk, keys, fuse_keys, fast_functions, inline_functions_fast_functions, rename_fused_keys, **kwargs)
46 dsk = optimize_blockwise(dsk, keys=keys)
47 dsk = fuse_roots(dsk, keys=keys)
---> 48 dsk = dsk.cull(set(keys))
49
50 if not config.get("optimization.fuse.active"):
~/dask/dask/dask/highlevelgraph.py in cull(self, keys)
625 """
626
--> 627 all_ext_keys = self.get_all_external_keys()
628 ret_layers = {}
629 ret_key_deps = {}
~/dask/dask/dask/highlevelgraph.py in get_all_external_keys(self)
516 self._all_external_keys = set()
517 for layer in self.layers.values():
--> 518 self._all_external_keys.update(layer.get_output_keys())
519 return self._all_external_keys
520
~/dask/dask/dask/blockwise.py in get_output_keys(self)
284 (self.output, *p)
285 for p in itertools.product(
--> 286 *[range(self.dims[i]) for i in self.output_indices]
287 )
288 }
~/dask/dask/dask/blockwise.py in <listcomp>(.0)
284 (self.output, *p)
285 for p in itertools.product(
--> 286 *[range(self.dims[i]) for i in self.output_indices]
287 )
288 }
KeyError: '.1'I'm not certain that this is related to the above issue, but it might be. |
Interesting - Not sure what is causing the problem off hand. I'll take a look |
|
Ah.. There was a copy-paste bug in the BlockwiseIO constructor that I just fixed in #6933 - I'm still getting an error for your example (albeit a different one). I'm still trying to figure out if the new error is another bug of mine :) |
|
Some follow-on difficulties with fusion of x = da.full(2, 7, chunks=(2,))
y = da.full(2, 10, chunks=(2,))
c = da.add(x,y)
c.compute(optimize_graph=False) # Succeeds
c.compute(optimize_graph=True) # Fails (the resulting graph is empty)Any thoughts as to what might be happening @rjzamora? |
Hmmm - I will look into this. It may be that this is the first case where we are trying to fuse two BlockwiseIO layers to gether, and we are loosing one of the IO subgraphs. It may turn out that we need to support a collection of IO subgraphs in BlockwiseIO |
|
Okay @rjzamora , here is the next blockwise failure case: array creation -> map blocks with broadcasting. I think this is one that is verging on a case where we might not want to do it blockwise (or defer that until later), but it should at least not error: def func(x, y, z):
return x + y + z
a = da.ones((3, 4), chunks=(3, 2))
b = da.ones((6, 2), chunks=(3, 2))
c = da.ones((4,), chunks=(2,))
d = da.map_blocks(func, a, b, c, chunks=((3, 3), (2, 2)), dtype=a.dtype)
d.compute() |
|
Or, an even simpler example that involves some inter-block communication: y = da.ones((2, 2), chunks=1)
y.transpose().compute()It seems to fail for similar reasons to the above -- some arg substitution isn't happening. |
c5d06c4 to
502218f
Compare
Thanks @ian-r-rose ! It looks like I may just need to do task substitution a bit "less efficiently" (and not worry about knowing the specific indexing rule to search for) until I can work out a more-clever apporach. |
|
@ian-r-rose - I submitted #6959 to (hopefully) address the latest bug you discovered. I would have pushed to this PR, but I don't believe I have permissions to do that. |
502218f to
0828928
Compare
Hmm, I have the box checked that allows for maintainers to push commits. Perhaps there was a conflict? (n.b., I just rebased and force pushed) |
Right - I don't have maintainer/write privileges :) |
0828928 to
8055a47
Compare
Co-authored-by: Richard (Rick) Zamora <rzamora217@gmail.com>
rjzamora
left a comment
There was a problem hiding this comment.
This looks good @ian-r-rose - Thanks for the work here!
I made some minor suggestions, but I am also happy with the PR as is.
| ndim = len(self.chunks) | ||
| chunk = tuple(self.chunks[i][idx[i]] for i in range(ndim)) | ||
|
|
||
| return (self.func, chunk) |
There was a problem hiding this comment.
This looks correct, and makes sense - You are using partial to wrap all options into self.func, because it is the same for every chunk. The only argument that may vary between two different chunks is the shape of that "chunk". Please correct me if I am missing anything :)
My suggestion is to add a sentence or two in the docstring to explain/document this assumption.
There was a problem hiding this comment.
@rjzamora You are right -- I'm currently thinking through whether this needs to be more flexible for other functions than initially implemented here. At the end of the day, we need to be able to ascertain everything we need from the key, but more info may need to be bound to any given BlockwiseIO to get stuff like from_array working.
There was a problem hiding this comment.
but more info may need to be bound to any given BlockwiseIO to get stuff like from_array working.
Good point- CreateArraySubgraph + BlockwiseCreateArray does not target the case where you have input data. I'm unsure if we want to use the same code path for from_array (maybe we do). For now, I'm okay with the solution focusing on pure "creation,"
There was a problem hiding this comment.
I'm wondering if it would make sense to pull out something like this logic for block_info and make it an optional argument to func.
|
Thanks for the review @rjzamora. I think there will need to be some follow-up work for things like |
|
Nice green check marks @ian-r-rose @rjzamora! I'll make sure to look through the changes here tomorrow |
jrbourbeau
left a comment
There was a problem hiding this comment.
Thanks for all your work on this @ian-r-rose @rjzamora!
|
Thanks for the review @jrbourbeau, and thanks for all the back-and-forth @rjzamora! |
|
This seems to have broken some xarray tests: pydata/xarray#4703. KeyErrors raised for keys Sorry I don't have a minimal example yet. |
|
Thanks @dcherian ! It looks like blockwise fusion is getting tripped up in some cases - I'm looking into it now. |
This reverts commit da7b56d.
This is a start at adding blockwise high-level layers for dask array creation routines (for the moment,
ones,zeros, andfull), addressing part of #6791. I've patterned this after the blockwise CSV readerLayer.black dask/flake8 dask