WIP: Support using vindex with Dask Arrays#3210
WIP: Support using vindex with Dask Arrays#3210jakirkham wants to merge 4 commits intodask:masterfrom
vindex with Dask Arrays#3210Conversation
b12d762 to
ed0d219
Compare
vindex with Dask Arraysvindex with Dask Arrays
9788a0b to
83ea719
Compare
|
cc @shoyer |
83ea719 to
d0281e6
Compare
|
I took a quick look at this but didn't immediately understand what was going on. I'll try again in a while. In the mean time if you have time to write a docstring or some more comments on how this works I think that it would be worthwhile. |
|
I just ran my example, and got a exception: import numpy as np
import dask.array as da
lut = np.random.rand(1024) * 100
dlut = da.from_array(lut, chunks=1024)
idx = np.random.randint(1024, size=(1000, 1000))
print(dlut[idx.ravel()])
# -> dask.array<getitem, shape=(1000000,), dtype=float64, chunksize=(1000000,)>
didx = da.from_array(idx, chunks=1000)
dlut.vindex[didx]Got this: |
d0281e6 to
19a4e5e
Compare
|
Yeah, I haven't tried to support anything beyond 1-D arrays. Wasn't actually clear whether they were supported by dlut.vindex[didx.ravel()].reshape(didx.shape)Edit: ...at least that worked for me locally. |
Yes, this is basically what we already do for NumPy arrays input to vindex. See these lines for the relevant logic, which could probably be slightly modified to work on dask array indexers, too: Lines 3459 to 3473 in bcaf86e |
Yeah I hear that. If you have suggestions to make the code clearer/simpler, they would certainly be welcome. Here's the rough gist of the code.
|
19a4e5e to
a84cf8c
Compare
| x = asarray(x) | ||
|
|
||
| oth_inds = [] | ||
| arr_inds = [] |
There was a problem hiding this comment.
Are these short for other and array or something else? If so I recommend adding the extra two characters here. I think that the cost of extra typing here does not outweigh the loss of information.
There was a problem hiding this comment.
Yes. Was trying to keep line length short later on. That said, no strong attachment to this or another variable name.
dask/array/core.py
Outdated
|
|
||
|
|
||
| def _vindex_dask_arrays(x, *indices): | ||
| """Point wise indexing with Dask Arrays""" |
There was a problem hiding this comment.
Can I request a docstring or comment here?
There was a problem hiding this comment.
What else would you like to see here (meaning content-wise)?
There was a problem hiding this comment.
Tried to add some prose here in the hopes of clarifying things. Please let me know what you think.
dask/array/core.py
Outdated
| out_axes = tuple(out_axes) | ||
|
|
||
| if not all(o == slice(None) for o in oth_inds): | ||
| x = x.vindex[oth_inds] |
There was a problem hiding this comment.
OK, so we do this in two passes, first with other_indices and then with the array_indices?
There was a problem hiding this comment.
Right. Sorry should have been explicit about this before.
We do the first pass to deal with everything that is static (e.g. known single indexes, slices, lists, NumPy arrays). Then we do a second pass with everything that is dynamic (i.e. Dask Arrays of known and unknown length).
Assumed there are use cases (xarray?) where being able to explicitly select portions of the graph to keep are important. If not, we could adjust our atop call to handle these as well.
There was a problem hiding this comment.
Yes, there are probably use cases where the ability to identify the right chunks ahead of time would make a big difference for efficiency.
There was a problem hiding this comment.
Yep, that would not surprise me. :)
Might do some light refactoring in here to make the static and dynamic passes more obvious.
There was a problem hiding this comment.
Added a comment before this to make the first pass obvious. Also explained in the docstring now.
| ]), | ||
| dtype=x.dtype, | ||
| concatenate=True | ||
| ) |
There was a problem hiding this comment.
I definitely don't understand what's going on here, but that's also pretty typical for non-trivial uses of atop in the codebase, much of which is my fault. This is a good learning experience for me to be on the other end of this. I was trying to think of ways to help with this situation. Perhaps comments for simple cases as in this snippet in unify_chunks?
arginds = [(asarray(a) if ind is not None else a, ind)
for a, ind in partition(2, args)] # [x, ij, y, jk]
args = list(concat(arginds)) # [(x, ij), (y, jk)]There was a problem hiding this comment.
Yeah, it's a bit ugly. Have been trying to thing of ways to make it clearer. Open to adding comments if that would help.
Basically we pass in our array (probably obvious), arange along each dimension in x, and then either a 1-D Dask Array that selects points or slice(None) (if there is no selection or if it was handled in the first pass). Also we interleaved arange and the 1-D selection, which makes this ugly here, but easier to work with in _vindex_dask_arrays_chunk. Expect these two last parts makes things confusing.
There was a problem hiding this comment.
I'm a little concerned about the algorithm here. If I understand it correctly, atop here is mapping over all possible pairs of chunks on the indexed and indexing array. This will work okay for small numbers of chunks, but could quickly become intractably slow for larger numbers of chunks.
That said, I can't think of any obvious ways to improve this within dask's data model. You really need to compute some of the indices to know which source chunks they index, but dask (at least the non-distributed version) does not have support for that sort of dynamic graph construction.
There was a problem hiding this comment.
It may be that it's always better to collapse a chunked index dask-array into a single chunk. This would increase our work-per-chunk but decrease the scheduling overhead. In common the case of single-dimensional indexers I suspect that this is almost always a good idea.
There was a problem hiding this comment.
Tried to add some information about what happens here in a docstring for this function and in a comment above. Suggestions welcome.
Certainly aware that there could be performance problems with large numbers of chunks and would be happy to see them improved. Though it is not unique to this case. That said, it feels like this issue is outside the scope of this PR at present.
| assert_eq(result, x[2, [1, 2, 7, 7]]) | ||
|
|
||
| result = d.vindex[(d % 3).nonzero()] | ||
| assert_eq(result, x[(x % 3).nonzero()]) |
There was a problem hiding this comment.
It would be reassuring to see more cases where the values of the array are
- out of order
- chunked differently.
- negative
There was a problem hiding this comment.
Sorry, could you please explain what you mean by "out of order"?
There was a problem hiding this comment.
Have added a couple explicit tests like this. Interestingly the nonzero case also captured these issues too. Though explicit test cases seem preferable.
| return idx_vals | ||
|
|
||
|
|
||
| def _vindex_dask_arrays_chunk(xc, *args): |
There was a problem hiding this comment.
It would be helpful if this function had a docstring with example input/output.
There was a problem hiding this comment.
Added a docstring. Missed the example input/output part when writing the docs/comments. Sorry. Happy to add that as well in a follow-up.
In the interim it would be good to know if the docs/comments added help clarify this function and its arguments. Also would be good to hear how to better present that information.
dask/array/core.py
Outdated
| xc_inds = [] | ||
| for i, (e_0, e_i) in enumerate(zip(inds, x_inds)): | ||
| if isinstance(e_0, np.ndarray): | ||
| xc_inds.append(np.equal.outer(e_0, e_i).nonzero()[1]) |
There was a problem hiding this comment.
I think np.in1d could be much faster here (O(n log n) vs O(n^2)). This requires allocating an array for the full outer product of the indices.
There was a problem hiding this comment.
Sure, that makes sense.
When I previously was playing with this in the gist, it used only Dask Array operations without atop. So when pushing it all into atop, expect that there remains some things from the old approach that are suboptimal.
Happy to fix any that come up.
There was a problem hiding this comment.
Looking back at it, I think the current code was chosen to preserve ordering. For example...
>>> import numpy as np
>>> e_0 = np.array([23, 21, 24])
>>> e_i = np.arange(21, 25)
>>> np.equal.outer(e_0, e_i).nonzero()[1]
array([2, 0, 3])
>>> np.in1d(e_i, e_0).nonzero()[0]
array([0, 2, 3])Not sure if there is an obvious way to get this from in1d or a similarly performant function, but would be happy to try. Expect this would be a good point to improve.
There was a problem hiding this comment.
If we're OK with a pandas dependency (which we would probably rather avoid in dask.array), this version does the lookup in linear time with a hash-table:
In [14]: import pandas as pd
In [15]: pd.Index(e_i).get_indexer(e_0)
Out[15]: array([2, 0, 3])
It looks like this could also be done with np.sort()/np.searchsorted(), which means O(n log n + k log n) time and O(n+k) space:
In [24]: np.searchsorted(np.sort(e_i), e_0)
Out[24]: array([2, 0, 3])
(you should add tests/think carefully about how to properly handle repeated indices)
There was a problem hiding this comment.
Thanks for the tip. Will give that a look.
So we do have tests for repeated indices in this PR. Please let me know if we are missing something in particular there.
Should add we don't have tests for non-sequential indices, which we need. Happy to add those.
There was a problem hiding this comment.
FWICT searchsorted works well. We can skip the sort as e_i is always sorted. Updated the PR accordingly. Thanks again for the suggestion.
| ]), | ||
| dtype=x.dtype, | ||
| concatenate=True | ||
| ) |
There was a problem hiding this comment.
I'm a little concerned about the algorithm here. If I understand it correctly, atop here is mapping over all possible pairs of chunks on the indexed and indexing array. This will work okay for small numbers of chunks, but could quickly become intractably slow for larger numbers of chunks.
That said, I can't think of any obvious ways to improve this within dask's data model. You really need to compute some of the indices to know which source chunks they index, but dask (at least the non-distributed version) does not have support for that sort of dynamic graph construction.
|
@jakirkham I've been following this PR with interest, and have tried it out this morning, but I get a MemoryError with the following: import dask.array as da
arr = da.arange(10000*10000, chunks=2000*2000)
index = da.random.uniform(0, 10000*10000, 3000*3000, chunks=2000*2000).astype(int)
arr.vindex[index].compute()I have 16GB of memory on mpy computer, so I'm a bit puzzled :) Anything I can help with ? |
a84cf8c to
dd94b89
Compare
|
Thanks for the ping @mraspaud. Sorry I've been slow to push this. Have been a bit busy of late. Thanks for the nice example. Ran into the same issue. It points back at the same LOC @shoyer mentioned above. Expect the issue is simply allocating the 2-D array in the equality comparison, which we could avoid in different ways. Independently there is a question about improving algorithmic performance there, which is worth considering. Feel free to share other thoughts in that thread. |
1de41e5 to
80e8d17
Compare
80e8d17 to
f94f6dc
Compare
vindex with Dask Arraysvindex with Dask Arrays
c13b550 to
35bee9c
Compare
83b551d to
035c06c
Compare
|
Have rebased and tweaked a bit now that PR ( #3353 ) is in. Code should be a bit simpler and easier to understand. Feedback welcome. |
Change `vindex` so that it can support using Dask Arrays for indexing. Does this using `atop` to map Dask Array indices to those representing particular values in the chunk. Thus allowing individual values to be pulled from each chunk and combined into the total result. Comes with the added bonus that this will work whether the Dask Arrays providing coordinate have a known length or an unknown length.
Provides docstrings that try to give an overall picture of what is happening in the code to handle Dask Arrays both globally, locally (within chunks), and when combined with arguments of known quantities. Adds comments near confusing sections to hopefully clarify what is happening.
Adds a few tests of using vindex with Dask Arrays. These include mixed usage of Dask Arrays and other content. Also includes using Dask Arrays with unknown shape.
035c06c to
9190782
Compare
| x, tuple(range(1, 1 + x.ndim)), | ||
| *concat([ | ||
| (e_0, (0,), e_i, (i,)) if isinstance(e_0, Array) else | ||
| (e_0, None, e_i, (i,)) |
There was a problem hiding this comment.
Consider writing the more succinct: (e_0, (0,) if isinstance(e_0, Array) else None, e_i, (i,)))
There was a problem hiding this comment.
Yeah I went back and forth on these. The current approach seems nice as you can easily contrast what you will get as a tuple.
| *concat([ | ||
| (e_0, (0,), e_i, (i,)) if isinstance(e_0, Array) else | ||
| (e_0, None, e_i, (i,)) | ||
| for i, (e_0, e_i) in enumerate(zip(arr_inds, x_inds), start=1) |
There was a problem hiding this comment.
I don't understand the variable names e_0 and e_i. I think using variable names like ref_ind and new_ind would make all this code easier to understand.
| else: | ||
| arr_inds.append(ind) | ||
| if 0 not in out_axes: | ||
| out_axes.append(0) |
There was a problem hiding this comment.
I was puzzled by this at first, and then I realized that this function only supported 1d dask arrays in the indexer currently.
Consider extending this to support arbitrary dimensionality inputs with broadcasting, or at least raise an informative error message.
I think broadcasting would actually be pretty easy, with broadcast_arrays(), out_axes in atop like what I did in isin and ravel/reshape around searchsorted for handling the chunk indices.
There was a problem hiding this comment.
So I tried to extend N-D arrays, but ran into some issues with index arrays that have unknown shape.
| for i, (e_0, e_i) in enumerate(zip(arr_inds, x_inds), start=1) | ||
| ]), | ||
| dtype=x.dtype, | ||
| concatenate=True |
There was a problem hiding this comment.
Something to watch out for: this operation is lazy, but isn't really distributed friendly. If I understand correctly, concatenate=True means that the indexing arrays essentially get concatenated together across chunks. So things will break if your indexing arrays are too big to fit into memory.
This could conceivably be avoided, but it would require some tricky logic to merge across chunks.
There was a problem hiding this comment.
Alright, will give this some thought. Thanks.
There was a problem hiding this comment.
AFAICT concatenate=True is just a convenience option that gives one a single NumPy array instead of some list of NumPy arrays that atop would otherwise provide to the kernel function. ( #1609 ) The amount of data given in each case (concatenate=False/concatenate=True) is the same. Historically it sounds like that wasn't always true.
There may be some argument to not performing the concatenation as the memory usage doubles in the process (at least temporarily). That said, one normally chooses chunk sizes knowing that copies may be made (maybe even a few). So this doesn't seem unreasonable to me.
There was a problem hiding this comment.
concatenate=True hasn't changed, and indeed atop has always processed multiple inputs at once when output indices are not available. But loading all input chunks corresponding to dropped outputs can still be problematic.
Looking at this again, I think you actually consolidate all indexed chunks along these dimensions, e.g., with array.vindex[key], then array could potentially get computed. This would be problematic in the case where array corresponds to a large tiled image, e.g., a 100k x 100k matrix with chunks of size 2000 x 2000.
That said, I don't think dask (without distributed) is capable in handling this in any sane way, since it really requires dynamic task creation. Otherwise you're stuck with all combinations of indexed and indexer chunks, which will quickly get overwhelming for moderately sized numbers of chunks.
|
Trying to wrap my head around it, but I still can't understand if there are differences in performance and/or capability from my take on it (#3407)? My implementation is also only supporting 0D and 1D indexes, but it's designed to be distributed-friendly (you'll get as many output chunks as the chunks in the index) and very parsimonious of RAM. |
|
So one of the things this tries to tackle is allowing Dask Arrays of unknown length to be applied as well. As a trivial example, Independently there is a question as to whether this should use Some of the discussion here is about handling N-D indexing arrays. Thus far that seems to be at odds with supporting unknown length/shape. Unfortunately have not had time to look at your proposal in-depth. So don’t know if there are other important points where they differ or whether there are other things to consider in it. |
|
Closing in favor of PR ( #3407 ), which seems to have superseded this work. People interested in this PR are encouraged to take a look at that one and share thoughts in that PR. |
Fixes #3096
Change
vindexso that it can support using Dask Arrays for indexing. Does this usingatopto map Dask Array indices to those representing particular values in the chunk. Thus allowing individual values to be pulled from each chunk and combined into the total result. Comes with the added bonus that this will work whether the Dask Arrays providing coordinates have a known length or an unknown length.