Blockwise array creation redux#7417
Conversation
|
@rjzamora @jrbourbeau your work on import checks already paying dividends: https://github.com/dask/dask/pull/7417/checks?check_run_id=2149288104 |
rjzamora
left a comment
There was a problem hiding this comment.
Thanks for working on this @ian-r-rose ! I know this is still WIP, but I left some initial thoughts since I was excited to take a look.
dask/layers.py
Outdated
| chunk_shape = tuple(chunk[i] for i, chunk in zip(idx, self.chunks)) | ||
| array_location = tuple( | ||
| (start[i], start[i + 1]) for i, start in zip(idx, self.starts) | ||
| ) | ||
| return { | ||
| "shape": self.shape, | ||
| "num-chunks": self.num_chunks, | ||
| "array-location": array_location, | ||
| "chunk-shape": chunk_shape, | ||
| } |
There was a problem hiding this comment.
Perhaps we should make it possible to avoid unnecessary logic for "simple" cases like zeros/full/ones?
There was a problem hiding this comment.
This block_info dict is modeled after that in Array.map_blocks (though it's not identical, since that can include dtypes). My thinking was that, since they are similar operations, the API should be similar, and similar data should be provided to the tasks.
That being said, it's true that in simpler cases there is some unnecessary logic here when constructing the block info. We could revert these changes to keep CreateArrayDeps simple, and then make a new blockwise layer for things like from_array. So to me it seems the choice is between having a larger number of more specific layer classes, or a smaller number of more general ones like this one.
There was a problem hiding this comment.
Or __init__ could take something like block_info=True to indicate that it should provide more than just the chunk shape. I generally like to have more predictable outputs for functions, but can be talked out of it :)
There was a problem hiding this comment.
I generally like to have more predictable outputs for functions, but can be talked out of it :)
Makes sense - I do prefer consistency (as you already have it), so I'm not really motivated to talk you out of this unless we discover a need for performance optimizations.
dask/layers.py
Outdated
| module, | ||
| name, | ||
| chunks, | ||
| serialize(seeds), |
There was a problem hiding this comment.
It may be easiest to separately serialize each seed, so that we can still do block_info["seed"] = self.seeds[n] in the getitem definition without needed any __dask_distributed_unpack__ logic in this class. However, we would then need to wrap the underlying creation function to handle deserialization of block_info["seed"].
If we do use a function wrapper, it probably makes sense to fully move the random_state_data operation from the client to the worker (and avoid the need to ship the seeds through the scheduler altogether). Do we know any RNG experts? Can we simply shift the default/random seed by some factor related to the chunk index?
There was a problem hiding this comment.
Yeah, I haven't fully thought through the serialization story here. But at least, I think that serialize does drill down into the list and serialize the seeds separately, c.f. https://github.com/dask/distributed/blob/bef0308962345303123aba7ec6730757a61a4dfc/distributed/protocol/serialize.py#L263-L284 , and it shouldn't be necessary to deserialize them on the scheduler.
It would definitely be nice to be able to move a smaller representation of the random state to the workers, but at least this implementations doesn't ship more seed data than the current implementation.
There was a problem hiding this comment.
But at least, I think that serialize does drill down into the list and serialize the seeds separately, c.f. https://github.com/dask/distributed/blob/bef0308962345303123aba7ec6730757a61a4dfc/distributed/protocol/serialize.py#L263-L284 , and it shouldn't be necessary to deserialize them on the scheduler.
Ah - You are correct that serialize will internally handle the elements of seeds seperately. I hadn't considered the possibility of simply splitting the header/frames on the scheduler, but that may actually be a good way to go.
Regarding the question of moving random state generation to the workers: I like the idea, but I also don't think you should worry about it in this PR.
There was a problem hiding this comment.
I was just experimenting with this. It seems that we will need to add an explicit iterate_collection= argument to serialize. Otherwise, we cannot guarentee that the scheduler will be able to access individual elements of a list/dict without needing to deserialize. I can/will submit a PR to distributed to add this feature, but this means a proper __dask_distributed_unpack__ soution here will not work with the latest release of distributed.
There was a problem hiding this comment.
I can/will submit a PR to distributed to add this feature, but this means a proper dask_distributed_unpack soution here will not work with the latest release of distributed.
On second thought, I don't think it makes sense to do this. In general, it seems like a much cleaner solution to just call serialize seperately on each element on the client, rather than adding an extra step of splitting the header/frames on the scheduler.
There was a problem hiding this comment.
On second thought, I don't think it makes sense to do this. In general, it seems like a much cleaner solution to just call
serializeseperately on each element on the client,
Yep, after taking another look at it, I agree with you
rather than adding an extra step of splitting the header/frames on the scheduler.
Can you elaborate more on what you mean here?
In general, I'm having a tough time reasoning about how deserialization should look on the worker. What would the idiomatic way of doing that look like? i.e.,
- How does one identify whether one should deserialize? If it is coming back as
distributed.protocol.Serialized, what would be the best way to identify that without relying ondistributedbeing installed? - I ran into a couple of problems around making sure the numpy deserializer is registered on the worker. What am I doing wrong?
(I'm not demanding answers to the above, just what I'm thinking about :) )
There was a problem hiding this comment.
rather than adding an extra step of splitting the header/frames on the scheduler.
Can you elaborate more on what you mean here?
If you call serialize on an entire collection, you will get a (header, frames) tuple, and so the scheduler will need to process/split the header and then convert this single tuple to a list of (header, frame) tuples. The logic is not that complicated, but it is messier than simply serializing the individual elements to begin with.
In general, I'm having a tough time reasoning about how deserialization should look on the worker. What would the idiomatic way of doing that look like? i.e.,
I have experimented with ways to do this, but I haven't established anything that I would call idiomatic yet...
How does one identify whether one should deserialize? If it is coming back as distributed.protocol.Serialized, what would be the best way to identify that without relying on distributed being installed?
I ran into a couple of problems around making sure the numpy deserializer is registered on the worker. What am I doing wrong?
- How does one identify whether one should deserialize? If it is coming back as distributed.protocol.Serialized, what would be the best way to identify that without relying on distributed being installed?
You have certainly isolated the crux of the problem here. It is easy to decide when to serialize (within an __dask_distributed_pack__ definition). However, it is not obvious when the worker will need to deserialize one or all of its arguments. In my first pass, I was relying on pickle rather than serialize/deserialize, so the function simply needed to check if the argument(s) in question was a bytes object. In 7415, I just moved over to serialize/deserialize, and I am relying on an explicit "serialized" label. I don't "love" that solution, but it seems to work.
- I ran into a couple of problems around making sure the numpy deserializer is registered on the worker. What am I doing wrong?
Not sure - but I'm hoping we wont need to do anything like this to check if the data is serialized.
There was a problem hiding this comment.
Yeah, I am probably doing something wrong, but I am finding that the first time I need to deserialize a numpy array on a worker it fails because that protocol hasn't been registered yet. Subsequent deserializations work fine.
I'm currently forcing the registration by relying on the side effect of this import, but I don't view that as a real solution. Perhaps @jrbourbeau has some insight as to what might be going wrong here?
|
This is very cool to see @ian-r-rose |
|
James has made a quick fix for the failing sparse test in #7421, btw |
|
|
||
| dsk = getem( | ||
| get_from, | ||
| dsk = graph_from_arraylike( |
There was a problem hiding this comment.
@rjzamora wanted to flag this as where we'll have to think through some of the array inlining logic.
There was a problem hiding this comment.
Got it. So, the original code allows you to include the array in the graph once (with a dedicated key), or to inline it in every task. So far, this PR is effectively inlining the array in every task by including it in the IO function (in graph_from_arraylike). Is that correct?
It seems like you could support inline_array=False, but you may need to further expand CreateArrayDeps to include optional args/kwargs that should be the same for all keys (and make it possible to specify these arguments in BlockwiseCreateArray). I guess if you can do this, you could avoid using partial to embed the array in io funciton altogether?
There was a problem hiding this comment.
Got it. So, the original code allows you to include the array in the graph once (with a dedicated key), or to inline it in every task. So far, this PR is effectively inlining the array in every task by including it in the IO function (in
graph_from_arraylike). Is that correct?
It seems like you could support
inline_array=False, but you may need to further expandCreateArrayDepsto include optional args/kwargs that should be the same for all keys (and make it possible to specify these arguments inBlockwiseCreateArray). I guess if you can do this, you could avoid using partial to embed the array in io funciton altogether?
Yes, you have this exactly right -- I'm planning to re-introduce non-inlining, but haven't tackled it yet. For the case of lazy arrays like Zarr, or small ones, this should already do what we want.
Thanks for the xref @GenevieveBuckley! Hopefully this will indeed help with some of their pain points (though not anything to do with
Thanks, I'll try to rebase |
247c015 to
f92c0fd
Compare
high-level graph. Will need to think about what happens when the user materializes client side.
dependencies for a random graph.
ed49ca1 to
474eff4
Compare
|
@ian-r-rose just double-checking, we're just waiting on #8542 to merge this right? |
Yes, that's right |
|
Shall we rerun tests and merge? |
|
I've just merged |
|
All passing except for windows37 |
jrbourbeau
left a comment
There was a problem hiding this comment.
Thanks @ian-r-rose @gjoseph92 @rjzamora
This is a small follow-up to #7417 which moves `import`s of `cached_cumsum` to be from `dask.utils` (its new location) instead of `dask.array.slicing` (where it used to be)
This is due to a change which was introduced in dask-2022.1.1. Seems to be coming from: dask/dask#7417 Related pull request: hyperspy#2888
Dask 2022.01.1 renamed `da.core.getem` to `da.core.graph_from_arraylike` and changed the interface (see dask/dask#7417). These functions are still the most convenient way to create a dask array from `get_chunk` calls and a chunk specification, so add a shim that picks the appropriate function. We also keep the more convenient `getem` API (I guess until the minimum version of dask for katdal no longer has `getem`).
This is an attempt to improve on the slight performance drop of `da.map_blocks`. Taking our cue from the internals of the new `da.graph_from_arraylike` function, we call `dask.blockwise.blockwise` directly. This allows us to construct a custom graph with three arguments to our putter function (adding the actual chunk to the usual two getter arguments). We need this because the returned graph is now a high-level one that is harder to modify than an old-school dict. This improves performance to equal that of the original getem version, even surpassing it when there are many chunks. The only downside is that classes like `ArraySliceDep` are brand new and might still change yet again in the near future, having been added in the recent dask/dask#7417 PR.
This is due to a change which was introduced in dask-2022.1.1. Seems to be coming from: dask/dask#7417 Related pull request: hyperspy/hyperspy#2888
Follow-on work to #6931 and #7281, supersedes #6984. This introduces more blockwise array creation routines, including
from_array/from_zarr, and (at least a subset of) random arrays.Following #7381, I am avoiding numpy/pandas imports in all graph materialization routines, as well as deserializing any helper arrays.
black dask/flake8 dask