Skip to content

Initial pass at blockwise array creation routines.#6931

Merged
jrbourbeau merged 12 commits intodask:masterfrom
ian-r-rose:blockwise-array-creation
Dec 17, 2020
Merged

Initial pass at blockwise array creation routines.#6931
jrbourbeau merged 12 commits intodask:masterfrom
ian-r-rose:blockwise-array-creation

Conversation

@ian-r-rose
Copy link
Collaborator

@ian-r-rose ian-r-rose commented Dec 4, 2020

This is a start at adding blockwise high-level layers for dask array creation routines (for the moment, ones, zeros, and full), addressing part of #6791. I've patterned this after the blockwise CSV reader Layer.

  • Tests added / passed
  • Passes black dask / flake8 dask

@jrbourbeau
Copy link
Member

Thanks for working on this @ian-r-rose! cc @rjzamora

Comment on lines +90 to +91
# out_ind = tuple(range(len(shape)))
out_ind = string.ascii_lowercase[: len(shape)] # mayday
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If I could get your eyes on this @rjzamora, I'd appreciate it. Based on my reading of the docs, the output_indices should be able to be either a character string "ijk", or a tuple (1, 2, 3). Since with an ndarray we don't a priori know the number of dimensions, my intention was to do the latter. However, I ran into some issues with it that I didn't see with the string-based approach.

It seems that the difference lies here:

ninds = 1 if isinstance(output_indices, str) else len(output_indices)

which strikes me as odd. I don't think this particular code path is exercised much, might it not be working as intended, or do I misunderstand what valid inputs should be here?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If I use the tuple line, I get

import dask.array
dask.array.ones((10,10), chunks=(1,1)).compute()
Details
---------------------------------------------------------------------------
TypeError                                 Traceback (most recent call last)
<ipython-input-1-ac15502bbc95> in <module>
      1 import dask.array
      2 
----> 3 dask.array.ones((10,10), chunks=(1,1)).compute()

~/dask/dask/dask/base.py in compute(self, **kwargs)
    278         dask.base.compute
    279         """
--> 280         (result,) = compute(self, traverse=False, **kwargs)
    281         return result
    282 

~/dask/dask/dask/base.py in compute(*args, **kwargs)
    566         postcomputes.append(x.__dask_postcompute__())
    567 
--> 568     results = schedule(dsk, keys, **kwargs)
    569     return repack([f(r, *a) for r, (f, a) in zip(results, postcomputes)])
    570 

~/dask/dask/dask/threaded.py in get(dsk, result, cache, num_workers, pool, **kwargs)
     74                 pools[thread][num_workers] = pool
     75 
---> 76     results = get_async(
     77         pool.apply_async,
     78         len(pool._pool),

~/dask/dask/dask/local.py in get_async(apply_async, num_workers, dsk, result, cache, get_id, rerun_exceptions_locally, pack_exception, raise_exception, callbacks, dumps, loads, **kwargs)
    484                         _execute_task(task, data)  # Re-execute locally
    485                     else:
--> 486                         raise_exception(exc, tb)
    487                 res, worker_id = loads(res_info)
    488                 state["cache"][key] = res

~/dask/dask/dask/local.py in reraise(exc, tb)
    314     if exc.__traceback__ is not tb:
    315         raise exc.with_traceback(tb)
--> 316     raise exc
    317 
    318 

~/dask/dask/dask/local.py in execute_task(key, task_info, dumps, loads, get_id, pack_exception)
    220     try:
    221         task, data = loads(task_info)
--> 222         result = _execute_task(task, data)
    223         id = get_id()
    224         result = dumps((result, id))

~/dask/dask/dask/core.py in _execute_task(arg, cache, dsk)
    119         # temporaries by their reference count and can execute certain
    120         # operations in-place.
--> 121         return func(*(_execute_task(a, cache) for a in args))
    122     elif not ishashable(arg):
    123         return arg

~/dask/dask/dask/optimization.py in __call__(self, *args)
    961         if not len(args) == len(self.inkeys):
    962             raise ValueError("Expected %d args, got %d" % (len(self.inkeys), len(args)))
--> 963         return core.get(self.dsk, self.outkey, dict(zip(self.inkeys, args)))
    964 
    965     def __reduce__(self):

~/dask/dask/dask/core.py in get(dsk, out, cache)
    149     for key in toposort(dsk):
    150         task = dsk[key]
--> 151         result = _execute_task(task, cache)
    152         cache[key] = result
    153     result = _execute_task(out, cache)

~/dask/dask/dask/core.py in _execute_task(arg, cache, dsk)
    119         # temporaries by their reference count and can execute certain
    120         # operations in-place.
--> 121         return func(*(_execute_task(a, cache) for a in args))
    122     elif not ishashable(arg):
    123         return arg

TypeError: __call__() takes 2 positional arguments but 3 were given
suggesting they are unexpectedly unpacked.

Copy link
Member

@rjzamora rjzamora Dec 4, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

which strikes me as odd. I don't think this particular code path is exercised much, might it not be working as intended, or do I misunderstand what valid inputs should be here?

You are right to question this :)

This is a new piece of logic that has only been tested for the few DataFrame-focused cases where BlockwiseIO is used. It certainly seems that we need to consider the case that output_indices is a tuple - Sorry about that!

EDIT: Since the original code was targeting DataFrames, I may have mistaken the desired structure of dsk - I'll have the think about this.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for pointing this out - If I understand correctly, there is no reason to use the dimensions here to define dsk. For, BlockwiseIO there should always be a single "dummy" variable (which will be replaced with the packed args for the IO function at run time). I submitted #6933 with the simple fix.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for looking into it @rjzamora!

@ian-r-rose ian-r-rose force-pushed the blockwise-array-creation branch from 5d162a4 to ecb2f58 Compare December 4, 2020 19:01
@ian-r-rose
Copy link
Collaborator Author

@rjzamora I've cherry-picked your commit from #6933 -- it seems to fix the immediate issue, though there may be some downstream things to fix up.

The following snippet adding a new axis (adapted from a test) fails for what appears to be dimensionality-related reasons:

import dask.array as da

def f(x):
    return x[:, None] * np.ones((1, 7))

x = da.ones(5, chunks=2)
y = da.blockwise(
    f, "aq", x, "a", new_axes={"q": 7}, concatenate=True, dtype=x.dtype
)
y.compute()
Details
---------------------------------------------------------------------------
KeyError                                  Traceback (most recent call last)
<ipython-input-1-d61b664d7769> in <module>
      8     f, "aq", x, "a", new_axes={"q": 7}, concatenate=True, dtype=x.dtype
      9 )
---> 10 y.compute()

~/dask/dask/dask/base.py in compute(self, **kwargs)
    278         dask.base.compute
    279         """
--> 280         (result,) = compute(self, traverse=False, **kwargs)
    281         return result
    282 

~/dask/dask/dask/base.py in compute(*args, **kwargs)
    560     )
    561 
--> 562     dsk = collections_to_dsk(collections, optimize_graph, **kwargs)
    563     keys, postcomputes = [], []
    564     for x in collections:

~/dask/dask/dask/base.py in collections_to_dsk(collections, optimize_graph, **kwargs)
    331             dsk, keys = _extract_graph_and_keys(val)
    332             groups[opt] = (dsk, keys)
--> 333             _opt = opt(dsk, keys, **kwargs)
    334             _opt_list.append(_opt)
    335 

~/dask/dask/dask/array/optimization.py in optimize(dsk, keys, fuse_keys, fast_functions, inline_functions_fast_functions, rename_fused_keys, **kwargs)
     46     dsk = optimize_blockwise(dsk, keys=keys)
     47     dsk = fuse_roots(dsk, keys=keys)
---> 48     dsk = dsk.cull(set(keys))
     49 
     50     if not config.get("optimization.fuse.active"):

~/dask/dask/dask/highlevelgraph.py in cull(self, keys)
    625         """
    626 
--> 627         all_ext_keys = self.get_all_external_keys()
    628         ret_layers = {}
    629         ret_key_deps = {}

~/dask/dask/dask/highlevelgraph.py in get_all_external_keys(self)
    516             self._all_external_keys = set()
    517             for layer in self.layers.values():
--> 518                 self._all_external_keys.update(layer.get_output_keys())
    519         return self._all_external_keys
    520 

~/dask/dask/dask/blockwise.py in get_output_keys(self)
    284             (self.output, *p)
    285             for p in itertools.product(
--> 286                 *[range(self.dims[i]) for i in self.output_indices]
    287             )
    288         }

~/dask/dask/dask/blockwise.py in <listcomp>(.0)
    284             (self.output, *p)
    285             for p in itertools.product(
--> 286                 *[range(self.dims[i]) for i in self.output_indices]
    287             )
    288         }

KeyError: '.1'

I'm not certain that this is related to the above issue, but it might be.

@rjzamora
Copy link
Member

rjzamora commented Dec 4, 2020

I'm not certain that this is related to the above issue, but it might be.

Interesting - Not sure what is causing the problem off hand. I'll take a look

@rjzamora
Copy link
Member

rjzamora commented Dec 4, 2020

Ah.. There was a copy-paste bug in the BlockwiseIO constructor that I just fixed in #6933 - I'm still getting an error for your example (albeit a different one). I'm still trying to figure out if the new error is another bug of mine :)

@ian-r-rose
Copy link
Collaborator Author

Ah.. There was a copy-paste bug in the BlockwiseIO constructor that I just fixed in #6933 - I'm still getting an error for your example (albeit a different one). I'm still trying to figure out if the new error is another bug of mine :)

Thanks for looking into this @rjzamora

@ian-r-rose
Copy link
Collaborator Author

Some follow-on difficulties with fusion of BlockwiseIO subgraphs: optimize_blockwise is still losing tasks in some cases. A minimal example:

x = da.full(2, 7, chunks=(2,))
y = da.full(2, 10, chunks=(2,))
c = da.add(x,y)
c.compute(optimize_graph=False) # Succeeds
c.compute(optimize_graph=True) # Fails (the resulting graph is empty)

Any thoughts as to what might be happening @rjzamora?

@rjzamora
Copy link
Member

rjzamora commented Dec 9, 2020

Any thoughts as to what might be happening @rjzamora?

Hmmm - I will look into this. It may be that this is the first case where we are trying to fuse two BlockwiseIO layers to gether, and we are loosing one of the IO subgraphs. It may turn out that we need to support a collection of IO subgraphs in BlockwiseIO

@ian-r-rose
Copy link
Collaborator Author

Okay @rjzamora , here is the next blockwise failure case: array creation -> map blocks with broadcasting. I think this is one that is verging on a case where we might not want to do it blockwise (or defer that until later), but it should at least not error:

def func(x, y, z):
    return x + y + z

a = da.ones((3, 4), chunks=(3, 2))
b = da.ones((6, 2), chunks=(3, 2))
c = da.ones((4,), chunks=(2,))
d = da.map_blocks(func, a, b, c, chunks=((3, 3), (2, 2)), dtype=a.dtype)
d.compute()

@ian-r-rose
Copy link
Collaborator Author

Or, an even simpler example that involves some inter-block communication:

y = da.ones((2, 2), chunks=1)
y.transpose().compute()

It seems to fail for similar reasons to the above -- some arg substitution isn't happening.

@ian-r-rose ian-r-rose force-pushed the blockwise-array-creation branch from c5d06c4 to 502218f Compare December 11, 2020 19:25
@rjzamora
Copy link
Member

It seems to fail for similar reasons to the above -- some arg substitution isn't happening

Thanks @ian-r-rose ! It looks like I may just need to do task substitution a bit "less efficiently" (and not worry about knowing the specific indexing rule to search for) until I can work out a more-clever apporach.

@rjzamora
Copy link
Member

rjzamora commented Dec 11, 2020

@ian-r-rose - I submitted #6959 to (hopefully) address the latest bug you discovered. I would have pushed to this PR, but I don't believe I have permissions to do that.

@ian-r-rose ian-r-rose force-pushed the blockwise-array-creation branch from 502218f to 0828928 Compare December 11, 2020 20:47
@ian-r-rose
Copy link
Collaborator Author

ian-r-rose commented Dec 11, 2020

@ian-r-rose - I submitted #6931 to (hopefully) address the latest bug you discovered. I would have pushed to this PR, but I don't believe I have permissions to do that.

Hmm, I have the box checked that allows for maintainers to push commits. Perhaps there was a conflict? (n.b., I just rebased and force pushed)

@rjzamora
Copy link
Member

Hmm, I have the box checked that allows for maintainers to push commits.

Right - I don't have maintainer/write privileges :)

@ian-r-rose ian-r-rose force-pushed the blockwise-array-creation branch from 0828928 to 8055a47 Compare December 11, 2020 22:40
@ian-r-rose ian-r-rose changed the title [WIP] Initial pass at blockwise array creation routines. Initial pass at blockwise array creation routines. Dec 11, 2020
Co-authored-by: Richard (Rick) Zamora <rzamora217@gmail.com>
Copy link
Member

@rjzamora rjzamora left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks good @ian-r-rose - Thanks for the work here!

I made some minor suggestions, but I am also happy with the PR as is.

ndim = len(self.chunks)
chunk = tuple(self.chunks[i][idx[i]] for i in range(ndim))

return (self.func, chunk)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks correct, and makes sense - You are using partial to wrap all options into self.func, because it is the same for every chunk. The only argument that may vary between two different chunks is the shape of that "chunk". Please correct me if I am missing anything :)

My suggestion is to add a sentence or two in the docstring to explain/document this assumption.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@rjzamora You are right -- I'm currently thinking through whether this needs to be more flexible for other functions than initially implemented here. At the end of the day, we need to be able to ascertain everything we need from the key, but more info may need to be bound to any given BlockwiseIO to get stuff like from_array working.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

but more info may need to be bound to any given BlockwiseIO to get stuff like from_array working.

Good point- CreateArraySubgraph + BlockwiseCreateArray does not target the case where you have input data. I'm unsure if we want to use the same code path for from_array (maybe we do). For now, I'm okay with the solution focusing on pure "creation,"

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm wondering if it would make sense to pull out something like this logic for block_info and make it an optional argument to func.

@ian-r-rose
Copy link
Collaborator Author

Thanks for the review @rjzamora. I think there will need to be some follow-up work for things like from_array, but this is a decent checkpoint.

@jrbourbeau
Copy link
Member

Nice green check marks @ian-r-rose @rjzamora! I'll make sure to look through the changes here tomorrow

Copy link
Member

@jrbourbeau jrbourbeau left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for all your work on this @ian-r-rose @rjzamora!

@jrbourbeau jrbourbeau merged commit da7b56d into dask:master Dec 17, 2020
@ian-r-rose
Copy link
Collaborator Author

Thanks for the review @jrbourbeau, and thanks for all the back-and-forth @rjzamora!

@ian-r-rose ian-r-rose mentioned this pull request Dec 17, 2020
2 tasks
@dcherian
Copy link
Collaborator

This seems to have broken some xarray tests: pydata/xarray#4703. KeyErrors raised for keys all-aggregate and blockwise-create-zeros

Sorry I don't have a minimal example yet.

@rjzamora
Copy link
Member

Thanks @dcherian !

It looks like blockwise fusion is getting tripped up in some cases - I'm looking into it now.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants