Skip to content

Array slicing HighLevelGraph layer#7655

Open
GenevieveBuckley wants to merge 28 commits intodask:mainfrom
GenevieveBuckley:slicing-HLG
Open

Array slicing HighLevelGraph layer#7655
GenevieveBuckley wants to merge 28 commits intodask:mainfrom
GenevieveBuckley:slicing-HLG

Conversation

@GenevieveBuckley
Copy link
Contributor

Array slicing HighLevelGraph layer

FYI @ian-r-rose & @gjoseph92
I've made a start, inserting a HighLevelGraph into the slice_slices_and_integers function of dask.array.slicing.py.

The next step is to somehow propagate this upwards through the rest of the slicing functions, so that it is only materialized if absolutely necessary (eg: to do fancy indexing, etc.)

@github-actions github-actions bot added the array label May 14, 2021
@GenevieveBuckley
Copy link
Contributor Author

It's not super clear what I should do next: work on "bubbling" the high level graph layer up? Find a specific use case to benchmark?

Cases like #5918 (comment) are unaffected by the changes here - should I try to target something like this?

@GenevieveBuckley
Copy link
Contributor Author

For some context, I've added these methods:

  • _keys_to_parts
  • _cull_dependencies
  • cull
  • _cull - last one left, this private method is the one that should return a new instance of the layer (except with only the bits that are absolutely necessary for the computation).

I can't really test things well without all four methods in place, so once I've got _cull in we can see what is or isn't working quite right.

Also note: finding the length and/or html representation of the dask computation will materialize the full graph. I haven't changed this part yet. The good news is, it should be a lot simpler to calculate the length without materializing the task dictionary than it is for our overlapping array work.

@GenevieveBuckley GenevieveBuckley marked this pull request as ready for review June 8, 2021 07:39
@GenevieveBuckley
Copy link
Contributor Author

I have a mistake in my implementation, that means any unsliced region of the array causes an error on .compute(). I'd like to talk about how best to fix this when we talk next.

Eg: with this PR branch, I can do this:

import dask.array as da

arr = da.ones((6,6), chunks=(3,3))
result = arr[:6, :6]
result.compute()

but this gives me an error

import dask.array as da

arr = da.ones((6,6), chunks=(3,3))
result = arr[:5, :5]  # only some of the whole array is sliced
result.compute()  # KeyError: ('ones-9321b8a9f4b4fe7e5d4e0edb7273ca60', (0, 1))

@GenevieveBuckley
Copy link
Contributor Author

Update:

  1. Unpacking parts from its tuple has helped get rid of some errors
  2. Now the errors happen inside _execute_task. I find debugging from here tricky to do.

Here's a little experiment I did, putting a breakpoint in before this line in _execute_task:

return func(*(_execute_task(a, cache) for a in args))

Dataframe shuffle example - working

This is the dataframe shuffle example - it works fine:

from dask.datasets import timeseries
ddf = timeseries().shuffle("id", shuffle="tasks")
ddf.compute()

Here are all the relevant values in the _execute_task function at this point:

(Pdb) func
subgraph_callable-eff735a4-d0d1-46bd-8447-428f69188257

(Pdb) cache
{}

(Pdb) args
arg = (subgraph_callable-eff735a4-d0d1-46bd-8447-428f69188257, ([Timestamp('2000-01-07 00:00:00', freq='D'), Timestamp('2000-01-08 00:00:00', freq='D')], 757507710))
(Pdb) cache = {}
dsk = None

(Pdb) for i, a in enumerate(args): print(f"{i}. {a}")
(Pdb) 0. ([Timestamp('2000-01-04 00:00:00', freq='D'), Timestamp('2000-01-05 00:00:00', freq='D')], 1156703389)

Array slicing example - broken

This is the array slicing example - it's broken, I need to figure out what went wrong

import dask.array as da
x = da.ones((9, 9), chunks=(3, 3))
y = x[5:, 5:]
y.compute()

Here are all the relevant values in the _execute_task function at this point:

(Pdb) func
<built-in function getitem>

(Pdb) cache
{}

(Pdb) args
arg = (<built-in function getitem>, ('ones-ddadc429cabbff50070648bf5d89b3ac', 1, 1), (slice(2, 3, 1), slice(2, 3, 1)))
cache = {}
dsk = None

(Pdb) for a in args: print(a)
('ones-ddadc429cabbff50070648bf5d89b3ac', 1, 1)
(slice(2, 3, 1), slice(2, 3, 1))

(Pdb) for i, a in enumerate(args): print(f"{i}. {a}")
0. ('ones-ddadc429cabbff50070648bf5d89b3ac', 1, 1)
1. (slice(2, 3, 1), slice(2, 3, 1))

Possible leads

It seems pretty fishy that there is one item in args for the working dataframe example, but there are two items in args for the array example. Perhaps that has something to do with why it's broken.

@GenevieveBuckley
Copy link
Contributor Author

GenevieveBuckley commented Jun 16, 2021

The getitem syntax we need to achieve is:

from operator import getitem
import dask.array as da

arr = da.ones((6,6), chunks=(3,3))
getitem(arr, (slice(5, None, None), slice(5, None, None))).compute()
# array([[1.]])

Which is equivalent to:

import dask.array as da

arr = da.ones((6,6), chunks=(3,3))
arr[5:, 5:].compute()
# array([[1.]])

@GenevieveBuckley
Copy link
Contributor Author

Another debugging experiment: comparing the dask main branch to the slicing-HLG branch.

Setup

I used an array example this time.

import numpy as np
import dask.array as da

x = da.from_array(np.arange(16).reshape((4,4)), chunks=(2,2))
y = x[3:, 3:]
y.compute()

What happened?

  • On the main branch, there is no error when y is computed
  • On the slicing-HLG branch, we get a TypeError at line 121 of dask/core.py, in the _execute_task function.
    return func(*(_execute_task(a, cache) for a in args))
    TypeError: tuple indices must be integers or slices, not tuple

What else is different?

I put a breakpoint in before line 121 of dask/core.py and compared what was going on for the two different branches

  1. The materialized task graph dictionary for the high level graph layer looks exactly the same for the main and slicing-hlg branches. I looked at this using:
    from pprint import pprint
    hlg = y.dask
    pprint(hlg.to_dict())
Click here to see code output:
{('array-d52eb27cbf828b7e8f8f94b1a6890e54', 0, 0): array([[0, 1],
       [4, 5]]),
 ('array-d52eb27cbf828b7e8f8f94b1a6890e54', 0, 1): array([[2, 3],
       [6, 7]]),
 ('array-d52eb27cbf828b7e8f8f94b1a6890e54', 1, 0): array([[ 8,  9],
       [12, 13]]),
 ('array-d52eb27cbf828b7e8f8f94b1a6890e54', 1, 1): array([[10, 11],
       [14, 15]]),
 ('getitem-bfd48e62fd5701fbd00c89a4c4b68984', 0, 0): (<built-in function getitem>,
                                                      ('array-d52eb27cbf828b7e8f8f94b1a6890e54',
                                                       1,
                                                       1),
                                                      (slice(1, 2, 1),
                                                       slice(1, 2, 1)))}

I had really expected to see something different with the brackets, particularly around the slices. The main branch code execution passes through the same line of code that creates an error on the slicing branch, so it's not like the code flow is different.

  1. The task dictionaries appear to be stored in different places:
    • In the __dict__['mapping'] attribute of the layer, for the main branch (layer type is MaterializedLayer)
    • In the _cached_dict attribute of the layer, for the slicing-HLG branch (layer type is SlicingLayer)
  2. The cache dictionary was an empty dictionary for the slicing-HLG branch, but had values in it on the main branch:
    (Pdb) cache
    {('array-d52eb27cbf828b7e8f8f94b1a6890e54', 1, 1): np.array([[10, 11], [14, 15]])}
    
    I didn't expect this since the cache is also an empty dictionary for our working dataframe shuffle example. But maybe it's not supposed to be empty, this could be a big clue.

Fixes I tried

I tried manually editing _execute_task to make the cache dictionary variable contain the same thing as it does in the main branch at this point. To do this, I added this line before line 121 in _execute_task:

        cache = {('array-d52eb27cbf828b7e8f8f94b1a6890e54', 1, 1): np.array([[10, 11], [14, 15]])}

Note: Obviously this change will break every other dask computation, but if that meant I could run this array example then I'd have a solid lead on what we need to fix.

What happened was kind of odd. Calling y.compute() no longer hit an error, but the output wasn't array[[15.]] like we expected, but instead the output was 'e0bcbf08d16c97f629562504a0d04198'. It's a string, with what looks like a token in it. It's not the token belonging to any earlier part of the dask computation, this one is new.

@GenevieveBuckley
Copy link
Contributor Author

GenevieveBuckley commented Jun 16, 2021

The high level graph layer dependencies are the same for both (but that doesn't rule out me making a mistake when I'm trying to use them to generate the fully materialized task graph)

>>> hlg.dependencies

{'array-d52eb27cbf828b7e8f8f94b1a6890e54': set(),
 'getitem-bfd48e62fd5701fbd00c89a4c4b68984': {'array-d52eb27cbf828b7e8f8f94b1a6890e54'}}

Also, the reported dependencies on the layers looks correct (and is the same for both the main and the slicing-HLG branches)

hlg.keys()
>>> hlg.keys()

dict_keys([('array-d52eb27cbf828b7e8f8f94b1a6890e54', 0, 0), ('array-d52eb27cbf828b7e8f8f94b1a6890e54', 0, 1), ('array-d52eb27cbf828b7e8f8f94b1a6890e54', 1, 0), ('array-d52eb27cbf828b7e8f8f94b1a6890e54', 1, 1), ('getitem-bfd48e62fd5701fbd00c89a4c4b68984', 0, 0)])

>>> key = ('getitem-bfd48e62fd5701fbd00c89a4c4b68984', 0, 0)
>>> layer1.get_dependencies(key, hlg.keys())

{('array-d52eb27cbf828b7e8f8f94b1a6890e54', 1, 1)}

I'm a bit stumped, and would appreciate any suggestions for things to try next.

@GenevieveBuckley
Copy link
Contributor Author

Progress update:

I have change the cull dependencies parts so we have a parts_in and parts_out. That means this simple example works now:

import numpy as np
import dask.array as da

x = da.from_array(np.arange(16).reshape((4,4)), chunks=(2,2))
y = x[3:, 3:]
y.compute()

TODO next:

  1. I still need to make a keys method that does not materialize the whole task graph
  2. There are a bunch of failing tests, mostly for slightly more complicated types of slicing. I think that means I'll also need to revisit how we "bubble up" the information from the slicing layer to fancier kinds of slicing operations. (I think we said that as a first step we'd just try to materialize the whole task graph in these cases, then work on improving that later if needed)

@GenevieveBuckley
Copy link
Contributor Author

@ian-r-rose & @gjoseph92 I think this is the wrong approach. Here's why I think this:

  • Slicing is slow, this is the problem we want solved
  • We thought a HLG might speed things up by not generating unnecessary tasks in the first place
  • But the _slice_1d function is already skipping over the chunks we don't need for the slices (so there's not much we're gaining anyway)
  • And to calculate the keys and output parts properly, we really need the itertools product list comprehensions (so we're not delaying much computation either)

I think the problem we want to solve is in a different place. I'm going to test something out & then write a short summary on that.

@GenevieveBuckley
Copy link
Contributor Author

I ran the test, unfortunately I don't have anything good to report here. We'll talk about it at our meeting tomorrow (or today, by the time you read this 🙂 )

Copy link
Collaborator

@ian-r-rose ian-r-rose left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is looking very good.

dask/layers.py Outdated
out_name,
in_name,
blockdims,
new_blockdims,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This argument isn't currently used, though it could be used to implement get_output_keys(), I think.

Or parts_out could do it, which may be simpler. That would allow you to get rid of new_blockdims here.

dask/layers.py Outdated
This method does not require graph materialization.
"""
deps = defaultdict(set)
parts_out = parts_out or self._keys_to_parts(keys)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
parts_out = parts_out or self._keys_to_parts(keys)
parts_out = parts_out or self._keys_to_output_parts(keys)

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since this code path isn't really used, we could also just make the arguments non-optional. In general I prefer non-nullable arguments unless there is a compelling reason for them

dask/layers.py Outdated
Comment on lines +180 to +188
def _input_parts(self):
"""Simple utility to get input chunk indices."""
parts_in = set()
block_slices = self._get_block_slices(self.shape, self.blockdims, self.index)
_part = []
for i in block_slices:
_part += sorted(i.keys())
parts_in.add(tuple(_part))
return parts_in
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is very nice.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sadly, no. This is the bit that's currently broken & I can't see a way to fix it without using the itertools product. (I know I said I fixed it, I was wrong. It only happened to work well for the one specific example I used as a test case)

I might have explained myself poorly today, perhaps this summary helps:

  • You can see how this implementation breaks things, but choosing any slicing example that will return values from more than one input chunk. Eg;
import numpy as np
import dask.array as da

x = da.from_array(np.arange(16).reshape((4,4)), chunks=(2,2))
y = x[1:, 1:]
y.compute()  # fails with the implementation
  • The itertools product function is needed so we list all of the necessary input chunks (all the combinations from our 1d chunk results).
  • This means there's not a lot of logic left that only happens in the _construct_graph function. That's why I was concerned about how useful this is (but I hadn't been considering your point about the benefits for nested slicing)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've just pushed a commit that replaces

    def _input_parts(self):
        """Simple utility to get input chunk indices."""
        parts_in = set()
        block_slices = self._get_block_slices(self.shape, self.blockdims, self.index)
        _part = []
        for i in block_slices:
            _part += sorted(i.keys())
        parts_in.add(tuple(_part))
        return parts_in

with this instead

    def _input_parts(self):
        """Simple utility to get input chunk indices."""
        block_slices = self._get_block_slices(self.shape, self.blockdims, self.index)
        parts_in = set(product(*[sorted(i.keys()) for i in block_slices]))
        return parts_in

Now I can make slices over multiple chunks again (yay, less broken), but it's not a great sign for the performance improvements we were hoping to get out of this. I haven't benchmarked anything yet though.

@GenevieveBuckley
Copy link
Contributor Author

So python sets/dicts do not preserve order. Oops. Mucking that up is a large part of why I've twisted myself up into knots and wasted a lot of time.

Now I can focus on the optimization. Currently there is one failing test, which I think is testing whether a nested slice of slices has the task graph optimized correctly. Obviously, that's the bit I need to do next.

pytest dask/array/tests/test_array_core.py::test_to_delayed_optimize_graph

@GenevieveBuckley
Copy link
Contributor Author

BTW, I played around a little with how to combine multiple nested slices into a single slice operation. (Note: there's still an error here you'll see if you stride forward and backwards with the slicing, I'll need to take into account the array length and remainder to get this part correct).

https://gist.github.com/GenevieveBuckley/f913cef861a918c2e1c94862e7bdbc60

@dhirschfeld
Copy link

So python sets/dicts do not preserve order

Sets don't but dictionaries do - it's mandated by the language spec since py37 (and was an implementation detail in py36)

image

https://docs.python.org/3.7/library/stdtypes.html#dict.values

@GenevieveBuckley
Copy link
Contributor Author

Thanks @dhirschfeld. I remember assuming ordered dictionaries while working on a project with python 3.6, and hadn't realized it had changed since then. That's good to hear!

@GenevieveBuckley
Copy link
Contributor Author

I'm not seeing any kind of speed improvement, and I would imagine I would be seeing some kind of change by now if what we're doing here an effective approach.

slicing-HLG-benchmarking

import dask.array as da

a = da.ones(1_000, chunks=1)
%timeit a[10:20].compute()

a = da.ones(10_000, chunks=1)
%timeit a[10:20].compute()

a = da.ones(100_000, chunks=1)
%timeit a[10:20].compute()

# and so on...

Last time we spoke @ian-r-rose and @gjoseph92 suggested this work might eventually be useful for a nested "slice into a slice" situation. There'd need to be a bit more work done for that to happen, see #7655.

But the reason I started this was because we wanted to make map_overlap faster and profiling showed there was a lot of time spent in slicing (not a nested slice of slices situation). Given that this PR doesn't seem to support that goal, I'm leaning towards scrapping/abandoning this. Comments are welcome, please weigh in.

@mrocklin
Copy link
Member

Quick comment, probably wrong. I'm not tracking this PR well enough to speak with any sort of authority, but this caught my eye:

this work might eventually be useful for a nested "slice into a slice" situation

Just so people are aware, implementing all of the corner cases for numpy-style indexing is very complex. I would recommend that people here not start walking in this direction. Instead there is a big pile of code in slicing.py which, although inscrutable, appears to be pretty much correct. I would love for us to find a way to reuse or wrap this code without having to reimplement it.

@gjoseph92
Copy link
Collaborator

I'm not seeing any kind of speed improvement

Particularly since this is implemented in the "reuse or wrap this code without having to reimplement it" fashion, I'm not really surprised that end-to-end performance is the same. However, I would hope that

%timeit a[10:20]

is slightly faster than on main. And I'd expect the compute() part to be commensurately slower. Basically, I'd expect the time saved in constructing the slice to be moved into collections_to_dsk, which is called within compute to materialize the graph. And that savings is meaningful. Particularly if a[:-2] is faster, and if it (someday) saves us having to send an entire slicing graph over the wire to the scheduler.

I'm leaning towards scrapping/abandoning this

Even if we don't see immediate performance improvements, I think there's value in a high-level Slicing layer. I don't see how we can one day turn off low-level optimizations for Array without moving slicing into HighLevelGraph layers. In particular, we'll need optimize_slices to work without materializing the graph. (This is the "slice into a slice" case we're talking about I assume.) I imagine that having a Slice layer would be necessary to make that optimization possible on a HLG. Indeed, there's a chance it might actually be easier to implement that way than the current introspection of the low-level tasks.

@bmerry
Copy link
Contributor

bmerry commented Jul 1, 2021

And I'd expect the compute() part to be commensurately slower. Basically, I'd expect the time saved in constructing the slice to be moved into collections_to_dsk, which is called within compute to materialize the graph.

I haven't been following the details of this PR, but just wanted to chip in to say that this would be pretty valuable for us (as well as doing slice-of-slice optimisations at the high level). We currently rely on some hacks like cached properties that construct slices lazily so that we can avoid paying the construction cost if a sliced array never actually gets used.

@GenevieveBuckley
Copy link
Contributor Author

I'm not really surprised that end-to-end performance is the same. However, I would hope that

%timeit a[10:20]

is slightly faster than on main. And I'd expect the compute() part to be commensurately slower.

Ah, of course - that was a silly thing for me to do, thanks for pointing it out.

Here we go, looking at just the graph construction stage. (Just to double check my understanding, this is graph construction, but not optimization or computation, do I have that right?)

% timeit a[10:20]

slicing-benchmark-1

And even though we haven't specifically done anything special for slices of slices in this PR, I thought it might be fun to look at those times too.

%timeit a[100:200][10:20]

slicing-benchmark-2

... and a slice of a slice of a slice...

%timeit a[100:200][0:50][10:20]

slicing-benchmark-3

@GenevieveBuckley
Copy link
Contributor Author

@bmerry is there a specific example you'd like us to look at/benchmark, just to keep your use case in mind while I work on this?

@GenevieveBuckley
Copy link
Contributor Author

Also, I've just been playing with Emil's benchmark script from here.

Comparing the slicing-HLG branch from this PR to the Dask main branch, there is a slight speed improvement generating the initial task graph.

It's a difference of 24.7 milliseconds, which is approximately 5.7% faster.

main branch

src.shape = (2048, 2048, 2048)
src.chunks = (64, 64, 64)
src.nbytes / (2**30) = 8.0
create dask array 'a' from zarr array 'src': 0.037327215 seconds
len(a.__dask_graph__()) = 32769
map_blocks dask array 'a' to dask array 'b': 0.000756337999 seconds
len(b.__dask_graph__()) = 65537
map_overlap dask array 'a' to dask array 'c': 0.433811585 seconds
len(c.__dask_graph__()) = 994425

slicing-HLG branch

src.shape = (2048, 2048, 2048)
src.chunks = (64, 64, 64)
src.nbytes / (2**30) = 8.0
create dask array 'a' from zarr array 'src': 0.037867653 seconds
len(a.__dask_graph__()) = 32769
map_blocks dask array 'a' to dask array 'b': 0.000736053 seconds
len(b.__dask_graph__()) = 65537
map_overlap dask array 'a' to dask array 'c': 0.40901264 seconds
len(c.__dask_graph__()) = 994425

@bmerry
Copy link
Contributor

bmerry commented Jul 2, 2021

@bmerry is there a specific example you'd like us to look at/benchmark, just to keep your use case in mind while I work on this?

I'm not sure there is really anything specific - I don't have a standalone benchmark. I guess one aspect of our use case is that we have a multi-dimensional array, and if the graph construction can be made to take O(sum(num_chunks)) rather than O(product(num_chunks)) then I'd expect it to be a game-changer. While any performance improvement is appreciated, 5-10% improvements aren't going to remove our need to delay slicing until it's needed.

@gjoseph92 gjoseph92 mentioned this pull request Jul 20, 2021
3 tasks
@github-actions github-actions bot added the needs attention It's been a while since this was pushed on. Needs attention from the owner or a maintainer. label Oct 21, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

array needs attention It's been a while since this was pushed on. Needs attention from the owner or a maintainer.

Projects

None yet

Development

Successfully merging this pull request may close these issues.

6 participants