Skip to content

WIP: Support using vindex with Dask Arrays#3210

Closed
jakirkham wants to merge 4 commits intodask:masterfrom
jakirkham:support_dask_arrays_vindex
Closed

WIP: Support using vindex with Dask Arrays#3210
jakirkham wants to merge 4 commits intodask:masterfrom
jakirkham:support_dask_arrays_vindex

Conversation

@jakirkham
Copy link
Member

@jakirkham jakirkham commented Feb 26, 2018

Fixes #3096

Change vindex so that it can support using Dask Arrays for indexing. Does this using atop to map Dask Array indices to those representing particular values in the chunk. Thus allowing individual values to be pulled from each chunk and combined into the total result. Comes with the added bonus that this will work whether the Dask Arrays providing coordinates have a known length or an unknown length.

@jakirkham jakirkham force-pushed the support_dask_arrays_vindex branch 5 times, most recently from b12d762 to ed0d219 Compare February 26, 2018 16:17
@jakirkham jakirkham changed the title WIP: Support using vindex with Dask Arrays Support using vindex with Dask Arrays Feb 26, 2018
@jakirkham jakirkham force-pushed the support_dask_arrays_vindex branch 3 times, most recently from 9788a0b to 83ea719 Compare February 26, 2018 16:32
@mrocklin
Copy link
Member

cc @shoyer

@jakirkham jakirkham force-pushed the support_dask_arrays_vindex branch from 83ea719 to d0281e6 Compare February 26, 2018 17:20
@mrocklin
Copy link
Member

I took a quick look at this but didn't immediately understand what was going on. I'll try again in a while. In the mean time if you have time to write a docstring or some more comments on how this works I think that it would be worthwhile.

@mraspaud
Copy link

I just ran my example, and got a exception:

import numpy as np
import dask.array as da

lut = np.random.rand(1024) * 100
dlut = da.from_array(lut, chunks=1024)
idx = np.random.randint(1024, size=(1000, 1000))
print(dlut[idx.ravel()])
# -> dask.array<getitem, shape=(1000000,), dtype=float64, chunksize=(1000000,)>

didx = da.from_array(idx, chunks=1000)
dlut.vindex[didx]

Got this:

ValueError: Chunks and shape must be of the same length/dimension. Got chunks=((1000,),), shape=(1000, 1000)

@jakirkham jakirkham force-pushed the support_dask_arrays_vindex branch from d0281e6 to 19a4e5e Compare February 26, 2018 19:32
@jakirkham
Copy link
Member Author

jakirkham commented Feb 26, 2018

Yeah, I haven't tried to support anything beyond 1-D arrays. Wasn't actually clear whether they were supported by vindex, but I guess they are for NumPy arrays. Don't think I'll have time to support every possible Dask Array combination. Could probably get something like this to work though...

dlut.vindex[didx.ravel()].reshape(didx.shape)

Edit: ...at least that worked for me locally.

@shoyer
Copy link
Member

shoyer commented Feb 26, 2018

Don't think I'll have time to support every possible Dask Array combination. Could probably get something like this to work though...
dlut.vindex[didx.ravel()].reshape(didx.shape)

Yes, this is basically what we already do for NumPy arrays input to vindex. See these lines for the relevant logic, which could probably be slightly modified to work on dask array indexers, too:

dask/dask/array/core.py

Lines 3459 to 3473 in bcaf86e

try:
broadcast_indexes = np.broadcast_arrays(*array_indexes.values())
except ValueError:
# note: error message exactly matches numpy
shapes_str = ' '.join(str(a.shape) for a in array_indexes.values())
raise IndexError('shape mismatch: indexing arrays could not be '
'broadcast together with shapes ' + shapes_str)
broadcast_shape = broadcast_indexes[0].shape
lookup = dict(zip(array_indexes, broadcast_indexes))
flat_indexes = [lookup[i].ravel().tolist() if i in lookup else None
for i in range(len(indexes))]
flat_indexes.extend([None] * (x.ndim - len(flat_indexes)))
result_1d = _vindex_1d(x, *flat_indexes)
return result_1d.reshape(broadcast_shape + result_1d.shape[1:])

@jakirkham
Copy link
Member Author

I took a quick look at this but didn't immediately understand what was going on. I'll try again in a while. In the mean time if you have time to write a docstring or some more comments on how this works I think that it would be worthwhile.

Yeah I hear that. If you have suggestions to make the code clearer/simpler, they would certainly be welcome.

Here's the rough gist of the code.

  1. Find indices for all points in the Dask Array reference frame.
  2. Pull relevant pieces from these indices into each chunk.
  3. Select out relevant matching indices in each chunk.
  4. Convert them to chunk relative indices.
  5. Pull out matching values from each chunk.

@jakirkham jakirkham force-pushed the support_dask_arrays_vindex branch from 19a4e5e to a84cf8c Compare February 27, 2018 18:21
x = asarray(x)

oth_inds = []
arr_inds = []
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are these short for other and array or something else? If so I recommend adding the extra two characters here. I think that the cost of extra typing here does not outweigh the loss of information.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. Was trying to keep line length short later on. That said, no strong attachment to this or another variable name.



def _vindex_dask_arrays(x, *indices):
"""Point wise indexing with Dask Arrays"""
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can I request a docstring or comment here?

Copy link
Member Author

@jakirkham jakirkham Mar 28, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What else would you like to see here (meaning content-wise)?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Tried to add some prose here in the hopes of clarifying things. Please let me know what you think.

out_axes = tuple(out_axes)

if not all(o == slice(None) for o in oth_inds):
x = x.vindex[oth_inds]
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK, so we do this in two passes, first with other_indices and then with the array_indices?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right. Sorry should have been explicit about this before.

We do the first pass to deal with everything that is static (e.g. known single indexes, slices, lists, NumPy arrays). Then we do a second pass with everything that is dynamic (i.e. Dask Arrays of known and unknown length).

Assumed there are use cases (xarray?) where being able to explicitly select portions of the graph to keep are important. If not, we could adjust our atop call to handle these as well.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, there are probably use cases where the ability to identify the right chunks ahead of time would make a big difference for efficiency.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep, that would not surprise me. :)

Might do some light refactoring in here to make the static and dynamic passes more obvious.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added a comment before this to make the first pass obvious. Also explained in the docstring now.

]),
dtype=x.dtype,
concatenate=True
)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I definitely don't understand what's going on here, but that's also pretty typical for non-trivial uses of atop in the codebase, much of which is my fault. This is a good learning experience for me to be on the other end of this. I was trying to think of ways to help with this situation. Perhaps comments for simple cases as in this snippet in unify_chunks?

    arginds = [(asarray(a) if ind is not None else a, ind)
               for a, ind in partition(2, args)]  # [x, ij, y, jk]
    args = list(concat(arginds))  # [(x, ij), (y, jk)]

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, it's a bit ugly. Have been trying to thing of ways to make it clearer. Open to adding comments if that would help.

Basically we pass in our array (probably obvious), arange along each dimension in x, and then either a 1-D Dask Array that selects points or slice(None) (if there is no selection or if it was handled in the first pass). Also we interleaved arange and the 1-D selection, which makes this ugly here, but easier to work with in _vindex_dask_arrays_chunk. Expect these two last parts makes things confusing.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm a little concerned about the algorithm here. If I understand it correctly, atop here is mapping over all possible pairs of chunks on the indexed and indexing array. This will work okay for small numbers of chunks, but could quickly become intractably slow for larger numbers of chunks.

That said, I can't think of any obvious ways to improve this within dask's data model. You really need to compute some of the indices to know which source chunks they index, but dask (at least the non-distributed version) does not have support for that sort of dynamic graph construction.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It may be that it's always better to collapse a chunked index dask-array into a single chunk. This would increase our work-per-chunk but decrease the scheduling overhead. In common the case of single-dimensional indexers I suspect that this is almost always a good idea.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Tried to add some information about what happens here in a docstring for this function and in a comment above. Suggestions welcome.

Certainly aware that there could be performance problems with large numbers of chunks and would be happy to see them improved. Though it is not unique to this case. That said, it feels like this issue is outside the scope of this PR at present.

assert_eq(result, x[2, [1, 2, 7, 7]])

result = d.vindex[(d % 3).nonzero()]
assert_eq(result, x[(x % 3).nonzero()])
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would be reassuring to see more cases where the values of the array are

  1. out of order
  2. chunked differently.
  3. negative

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry, could you please explain what you mean by "out of order"?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

da.asarray([7, 7, 1, 6])

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Have added a couple explicit tests like this. Interestingly the nonzero case also captured these issues too. Though explicit test cases seem preferable.

return idx_vals


def _vindex_dask_arrays_chunk(xc, *args):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would be helpful if this function had a docstring with example input/output.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added a docstring. Missed the example input/output part when writing the docs/comments. Sorry. Happy to add that as well in a follow-up.

In the interim it would be good to know if the docs/comments added help clarify this function and its arguments. Also would be good to hear how to better present that information.

xc_inds = []
for i, (e_0, e_i) in enumerate(zip(inds, x_inds)):
if isinstance(e_0, np.ndarray):
xc_inds.append(np.equal.outer(e_0, e_i).nonzero()[1])
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think np.in1d could be much faster here (O(n log n) vs O(n^2)). This requires allocating an array for the full outer product of the indices.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, that makes sense.

When I previously was playing with this in the gist, it used only Dask Array operations without atop. So when pushing it all into atop, expect that there remains some things from the old approach that are suboptimal.

Happy to fix any that come up.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looking back at it, I think the current code was chosen to preserve ordering. For example...

>>> import numpy as np
>>> e_0 = np.array([23, 21, 24])
>>> e_i = np.arange(21, 25)
>>> np.equal.outer(e_0, e_i).nonzero()[1]
array([2, 0, 3])
>>> np.in1d(e_i, e_0).nonzero()[0]
array([0, 2, 3])

Not sure if there is an obvious way to get this from in1d or a similarly performant function, but would be happy to try. Expect this would be a good point to improve.

Copy link
Member

@shoyer shoyer Mar 28, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we're OK with a pandas dependency (which we would probably rather avoid in dask.array), this version does the lookup in linear time with a hash-table:

In [14]: import pandas as pd

In [15]: pd.Index(e_i).get_indexer(e_0)
Out[15]: array([2, 0, 3])

It looks like this could also be done with np.sort()/np.searchsorted(), which means O(n log n + k log n) time and O(n+k) space:

In [24]: np.searchsorted(np.sort(e_i), e_0)
Out[24]: array([2, 0, 3])

(you should add tests/think carefully about how to properly handle repeated indices)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the tip. Will give that a look.

So we do have tests for repeated indices in this PR. Please let me know if we are missing something in particular there.

Should add we don't have tests for non-sequential indices, which we need. Happy to add those.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

FWICT searchsorted works well. We can skip the sort as e_i is always sorted. Updated the PR accordingly. Thanks again for the suggestion.

]),
dtype=x.dtype,
concatenate=True
)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm a little concerned about the algorithm here. If I understand it correctly, atop here is mapping over all possible pairs of chunks on the indexed and indexing array. This will work okay for small numbers of chunks, but could quickly become intractably slow for larger numbers of chunks.

That said, I can't think of any obvious ways to improve this within dask's data model. You really need to compute some of the indices to know which source chunks they index, but dask (at least the non-distributed version) does not have support for that sort of dynamic graph construction.

@mraspaud
Copy link

@jakirkham I've been following this PR with interest, and have tried it out this morning, but I get a MemoryError with the following:

import dask.array as da
arr = da.arange(10000*10000, chunks=2000*2000)
index = da.random.uniform(0, 10000*10000, 3000*3000, chunks=2000*2000).astype(int)
arr.vindex[index].compute()

I have 16GB of memory on mpy computer, so I'm a bit puzzled :)

Anything I can help with ?

@jakirkham jakirkham force-pushed the support_dask_arrays_vindex branch from a84cf8c to dd94b89 Compare March 28, 2018 05:42
@jakirkham
Copy link
Member Author

Thanks for the ping @mraspaud. Sorry I've been slow to push this. Have been a bit busy of late.

Thanks for the nice example. Ran into the same issue. It points back at the same LOC @shoyer mentioned above.

Expect the issue is simply allocating the 2-D array in the equality comparison, which we could avoid in different ways. Independently there is a question about improving algorithmic performance there, which is worth considering. Feel free to share other thoughts in that thread.

@jakirkham jakirkham force-pushed the support_dask_arrays_vindex branch 3 times, most recently from 1de41e5 to 80e8d17 Compare March 28, 2018 07:11
@jakirkham
Copy link
Member Author

Thanks to @shoyer's suggestion the PR now uses searchsorted, which avoids the memory issue and is a bit more performant. Was able to run through your example code locally without issue, @mraspaud. Could you please retest when you have a chance and let us know how it goes?

@jakirkham jakirkham force-pushed the support_dask_arrays_vindex branch from 80e8d17 to f94f6dc Compare March 28, 2018 07:50
@jakirkham jakirkham changed the title Support using vindex with Dask Arrays WIP: Support using vindex with Dask Arrays Mar 28, 2018
@jakirkham jakirkham force-pushed the support_dask_arrays_vindex branch 3 times, most recently from c13b550 to 35bee9c Compare March 29, 2018 14:50
@jakirkham
Copy link
Member Author

jakirkham commented Mar 29, 2018

Now new and improved with support for N-D indices. :)

So @mraspaud's example now works.

Edit: Had to back these changes out as they were causing other tests to fail. :(

@jakirkham jakirkham force-pushed the support_dask_arrays_vindex branch 2 times, most recently from 83b551d to 035c06c Compare April 5, 2018 02:53
@jakirkham
Copy link
Member Author

Have rebased and tweaked a bit now that PR ( #3353 ) is in. Code should be a bit simpler and easier to understand. Feedback welcome.

Change `vindex` so that it can support using Dask Arrays for indexing.
Does this using `atop` to map Dask Array indices to those representing
particular values in the chunk. Thus allowing individual values to be
pulled from each chunk and combined into the total result. Comes with
the added bonus that this will work whether the Dask Arrays providing
coordinate have a known length or an unknown length.
Provides docstrings that try to give an overall picture of what is
happening in the code to handle Dask Arrays both globally, locally
(within chunks), and when combined with arguments of known quantities.
Adds comments near confusing sections to hopefully clarify what is
happening.
Adds a few tests of using vindex with Dask Arrays. These include mixed
usage of Dask Arrays and other content. Also includes using Dask Arrays
with unknown shape.
@jakirkham jakirkham force-pushed the support_dask_arrays_vindex branch from 035c06c to 9190782 Compare April 5, 2018 04:23
x, tuple(range(1, 1 + x.ndim)),
*concat([
(e_0, (0,), e_i, (i,)) if isinstance(e_0, Array) else
(e_0, None, e_i, (i,))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider writing the more succinct: (e_0, (0,) if isinstance(e_0, Array) else None, e_i, (i,)))

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah I went back and forth on these. The current approach seems nice as you can easily contrast what you will get as a tuple.

*concat([
(e_0, (0,), e_i, (i,)) if isinstance(e_0, Array) else
(e_0, None, e_i, (i,))
for i, (e_0, e_i) in enumerate(zip(arr_inds, x_inds), start=1)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't understand the variable names e_0 and e_i. I think using variable names like ref_ind and new_ind would make all this code easier to understand.

else:
arr_inds.append(ind)
if 0 not in out_axes:
out_axes.append(0)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was puzzled by this at first, and then I realized that this function only supported 1d dask arrays in the indexer currently.

Consider extending this to support arbitrary dimensionality inputs with broadcasting, or at least raise an informative error message.

I think broadcasting would actually be pretty easy, with broadcast_arrays(), out_axes in atop like what I did in isin and ravel/reshape around searchsorted for handling the chunk indices.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So I tried to extend N-D arrays, but ran into some issues with index arrays that have unknown shape.

for i, (e_0, e_i) in enumerate(zip(arr_inds, x_inds), start=1)
]),
dtype=x.dtype,
concatenate=True
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Something to watch out for: this operation is lazy, but isn't really distributed friendly. If I understand correctly, concatenate=True means that the indexing arrays essentially get concatenated together across chunks. So things will break if your indexing arrays are too big to fit into memory.

This could conceivably be avoided, but it would require some tricky logic to merge across chunks.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Alright, will give this some thought. Thanks.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

AFAICT concatenate=True is just a convenience option that gives one a single NumPy array instead of some list of NumPy arrays that atop would otherwise provide to the kernel function. ( #1609 ) The amount of data given in each case (concatenate=False/concatenate=True) is the same. Historically it sounds like that wasn't always true.

There may be some argument to not performing the concatenation as the memory usage doubles in the process (at least temporarily). That said, one normally chooses chunk sizes knowing that copies may be made (maybe even a few). So this doesn't seem unreasonable to me.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

concatenate=True hasn't changed, and indeed atop has always processed multiple inputs at once when output indices are not available. But loading all input chunks corresponding to dropped outputs can still be problematic.

Looking at this again, I think you actually consolidate all indexed chunks along these dimensions, e.g., with array.vindex[key], then array could potentially get computed. This would be problematic in the case where array corresponds to a large tiled image, e.g., a 100k x 100k matrix with chunks of size 2000 x 2000.

That said, I don't think dask (without distributed) is capable in handling this in any sane way, since it really requires dynamic task creation. Otherwise you're stuck with all combinations of indexed and indexer chunks, which will quickly get overwhelming for moderately sized numbers of chunks.

@crusaderky
Copy link
Collaborator

Trying to wrap my head around it, but I still can't understand if there are differences in performance and/or capability from my take on it (#3407)? My implementation is also only supporting 0D and 1D indexes, but it's designed to be distributed-friendly (you'll get as many output chunks as the chunks in the index) and very parsimonious of RAM.

@jakirkham
Copy link
Member Author

So one of the things this tries to tackle is allowing Dask Arrays of unknown length to be applied as well. As a trivial example, d.vindex[d.nonzero()].

Independently there is a question as to whether this should use __getitem__ or vindex.

Some of the discussion here is about handling N-D indexing arrays. Thus far that seems to be at odds with supporting unknown length/shape.

Unfortunately have not had time to look at your proposal in-depth. So don’t know if there are other important points where they differ or whether there are other things to consider in it.

@jakirkham
Copy link
Member Author

Closing in favor of PR ( #3407 ), which seems to have superseded this work. People interested in this PR are encouraged to take a look at that one and share thoughts in that PR.

@jakirkham jakirkham closed this Jun 27, 2018
@jakirkham jakirkham deleted the support_dask_arrays_vindex branch June 27, 2018 02:20
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants