Skip to content

Blockwise array creation redux#7417

Merged
jrbourbeau merged 54 commits intodask:mainfrom
ian-r-rose:blockwise-array-creation-redux
Jan 21, 2022
Merged

Blockwise array creation redux#7417
jrbourbeau merged 54 commits intodask:mainfrom
ian-r-rose:blockwise-array-creation-redux

Conversation

@ian-r-rose
Copy link
Collaborator

@ian-r-rose ian-r-rose commented Mar 19, 2021

Follow-on work to #6931 and #7281, supersedes #6984. This introduces more blockwise array creation routines, including from_array/from_zarr, and (at least a subset of) random arrays.

Following #7381, I am avoiding numpy/pandas imports in all graph materialization routines, as well as deserializing any helper arrays.

  • Tests added / passed
  • Passes black dask / flake8 dask

@ian-r-rose ian-r-rose marked this pull request as draft March 19, 2021 14:29
@ian-r-rose ian-r-rose mentioned this pull request Mar 19, 2021
2 tasks
@ian-r-rose
Copy link
Collaborator Author

@rjzamora @jrbourbeau your work on import checks already paying dividends: https://github.com/dask/dask/pull/7417/checks?check_run_id=2149288104

Copy link
Member

@rjzamora rjzamora left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for working on this @ian-r-rose ! I know this is still WIP, but I left some initial thoughts since I was excited to take a look.

dask/layers.py Outdated
Comment on lines +52 to +61
chunk_shape = tuple(chunk[i] for i, chunk in zip(idx, self.chunks))
array_location = tuple(
(start[i], start[i + 1]) for i, start in zip(idx, self.starts)
)
return {
"shape": self.shape,
"num-chunks": self.num_chunks,
"array-location": array_location,
"chunk-shape": chunk_shape,
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Perhaps we should make it possible to avoid unnecessary logic for "simple" cases like zeros/full/ones?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This block_info dict is modeled after that in Array.map_blocks (though it's not identical, since that can include dtypes). My thinking was that, since they are similar operations, the API should be similar, and similar data should be provided to the tasks.

That being said, it's true that in simpler cases there is some unnecessary logic here when constructing the block info. We could revert these changes to keep CreateArrayDeps simple, and then make a new blockwise layer for things like from_array. So to me it seems the choice is between having a larger number of more specific layer classes, or a smaller number of more general ones like this one.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Or __init__ could take something like block_info=True to indicate that it should provide more than just the chunk shape. I generally like to have more predictable outputs for functions, but can be talked out of it :)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I generally like to have more predictable outputs for functions, but can be talked out of it :)

Makes sense - I do prefer consistency (as you already have it), so I'm not really motivated to talk you out of this unless we discover a need for performance optimizations.

dask/layers.py Outdated
module,
name,
chunks,
serialize(seeds),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It may be easiest to separately serialize each seed, so that we can still do block_info["seed"] = self.seeds[n] in the getitem definition without needed any __dask_distributed_unpack__ logic in this class. However, we would then need to wrap the underlying creation function to handle deserialization of block_info["seed"].

If we do use a function wrapper, it probably makes sense to fully move the random_state_data operation from the client to the worker (and avoid the need to ship the seeds through the scheduler altogether). Do we know any RNG experts? Can we simply shift the default/random seed by some factor related to the chunk index?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I haven't fully thought through the serialization story here. But at least, I think that serialize does drill down into the list and serialize the seeds separately, c.f. https://github.com/dask/distributed/blob/bef0308962345303123aba7ec6730757a61a4dfc/distributed/protocol/serialize.py#L263-L284 , and it shouldn't be necessary to deserialize them on the scheduler.

It would definitely be nice to be able to move a smaller representation of the random state to the workers, but at least this implementations doesn't ship more seed data than the current implementation.

Copy link
Member

@rjzamora rjzamora Mar 24, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But at least, I think that serialize does drill down into the list and serialize the seeds separately, c.f. https://github.com/dask/distributed/blob/bef0308962345303123aba7ec6730757a61a4dfc/distributed/protocol/serialize.py#L263-L284 , and it shouldn't be necessary to deserialize them on the scheduler.

Ah - You are correct that serialize will internally handle the elements of seeds seperately. I hadn't considered the possibility of simply splitting the header/frames on the scheduler, but that may actually be a good way to go.

Regarding the question of moving random state generation to the workers: I like the idea, but I also don't think you should worry about it in this PR.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was just experimenting with this. It seems that we will need to add an explicit iterate_collection= argument to serialize. Otherwise, we cannot guarentee that the scheduler will be able to access individual elements of a list/dict without needing to deserialize. I can/will submit a PR to distributed to add this feature, but this means a proper __dask_distributed_unpack__ soution here will not work with the latest release of distributed.

Copy link
Member

@rjzamora rjzamora Mar 24, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can/will submit a PR to distributed to add this feature, but this means a proper dask_distributed_unpack soution here will not work with the latest release of distributed.

On second thought, I don't think it makes sense to do this. In general, it seems like a much cleaner solution to just call serialize seperately on each element on the client, rather than adding an extra step of splitting the header/frames on the scheduler.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

On second thought, I don't think it makes sense to do this. In general, it seems like a much cleaner solution to just call serialize seperately on each element on the client,

Yep, after taking another look at it, I agree with you

rather than adding an extra step of splitting the header/frames on the scheduler.

Can you elaborate more on what you mean here?

In general, I'm having a tough time reasoning about how deserialization should look on the worker. What would the idiomatic way of doing that look like? i.e.,

  1. How does one identify whether one should deserialize? If it is coming back as distributed.protocol.Serialized, what would be the best way to identify that without relying on distributed being installed?
  2. I ran into a couple of problems around making sure the numpy deserializer is registered on the worker. What am I doing wrong?

(I'm not demanding answers to the above, just what I'm thinking about :) )

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

rather than adding an extra step of splitting the header/frames on the scheduler.

Can you elaborate more on what you mean here?

If you call serialize on an entire collection, you will get a (header, frames) tuple, and so the scheduler will need to process/split the header and then convert this single tuple to a list of (header, frame) tuples. The logic is not that complicated, but it is messier than simply serializing the individual elements to begin with.

In general, I'm having a tough time reasoning about how deserialization should look on the worker. What would the idiomatic way of doing that look like? i.e.,

I have experimented with ways to do this, but I haven't established anything that I would call idiomatic yet...

How does one identify whether one should deserialize? If it is coming back as distributed.protocol.Serialized, what would be the best way to identify that without relying on distributed being installed?
I ran into a couple of problems around making sure the numpy deserializer is registered on the worker. What am I doing wrong?

  1. How does one identify whether one should deserialize? If it is coming back as distributed.protocol.Serialized, what would be the best way to identify that without relying on distributed being installed?

You have certainly isolated the crux of the problem here. It is easy to decide when to serialize (within an __dask_distributed_pack__ definition). However, it is not obvious when the worker will need to deserialize one or all of its arguments. In my first pass, I was relying on pickle rather than serialize/deserialize, so the function simply needed to check if the argument(s) in question was a bytes object. In 7415, I just moved over to serialize/deserialize, and I am relying on an explicit "serialized" label. I don't "love" that solution, but it seems to work.

  1. I ran into a couple of problems around making sure the numpy deserializer is registered on the worker. What am I doing wrong?

Not sure - but I'm hoping we wont need to do anything like this to check if the data is serialized.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I am probably doing something wrong, but I am finding that the first time I need to deserialize a numpy array on a worker it fails because that protocol hasn't been registered yet. Subsequent deserializations work fine.

I'm currently forcing the registration by relying on the side effect of this import, but I don't view that as a real solution. Perhaps @jrbourbeau has some insight as to what might be going wrong here?

@GenevieveBuckley
Copy link
Contributor

This is very cool to see @ian-r-rose
We've been talking to the ilastik team about this recently #7404

@GenevieveBuckley
Copy link
Contributor

James has made a quick fix for the failing sparse test in #7421, btw


dsk = getem(
get_from,
dsk = graph_from_arraylike(
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@rjzamora wanted to flag this as where we'll have to think through some of the array inlining logic.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Got it. So, the original code allows you to include the array in the graph once (with a dedicated key), or to inline it in every task. So far, this PR is effectively inlining the array in every task by including it in the IO function (in graph_from_arraylike). Is that correct?

It seems like you could support inline_array=False, but you may need to further expand CreateArrayDeps to include optional args/kwargs that should be the same for all keys (and make it possible to specify these arguments in BlockwiseCreateArray). I guess if you can do this, you could avoid using partial to embed the array in io funciton altogether?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Got it. So, the original code allows you to include the array in the graph once (with a dedicated key), or to inline it in every task. So far, this PR is effectively inlining the array in every task by including it in the IO function (in graph_from_arraylike). Is that correct?

It seems like you could support inline_array=False, but you may need to further expand CreateArrayDeps to include optional args/kwargs that should be the same for all keys (and make it possible to specify these arguments in BlockwiseCreateArray). I guess if you can do this, you could avoid using partial to embed the array in io funciton altogether?

Yes, you have this exactly right -- I'm planning to re-introduce non-inlining, but haven't tackled it yet. For the case of lazy arrays like Zarr, or small ones, this should already do what we want.

@ian-r-rose
Copy link
Collaborator Author

This is very cool to see @ian-r-rose
We've been talking to the ilastik team about this recently #7404

Thanks for the xref @GenevieveBuckley! Hopefully this will indeed help with some of their pain points (though not anything to do with map_overlap for the time being).

James has made a quick fix for the failing sparse test in #7421, btw

Thanks, I'll try to rebase

@ian-r-rose ian-r-rose force-pushed the blockwise-array-creation-redux branch from ed49ca1 to 474eff4 Compare January 12, 2022 23:26
@gjoseph92
Copy link
Collaborator

@ian-r-rose just double-checking, we're just waiting on #8542 to merge this right?

@ian-r-rose
Copy link
Collaborator Author

@ian-r-rose just double-checking, we're just waiting on #8542 to merge this right?

Yes, that's right

@gjoseph92
Copy link
Collaborator

Shall we rerun tests and merge?

@ian-r-rose
Copy link
Collaborator Author

I've just merged main to pick up #8542. I suspect we'll want to have this on main for a bit before release, but would defer to @jrbourbeau on that

@ian-r-rose
Copy link
Collaborator Author

All passing except for windows37

Copy link
Member

@jrbourbeau jrbourbeau left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jrbourbeau jrbourbeau merged commit 57f5cc3 into dask:main Jan 21, 2022
jsignell pushed a commit that referenced this pull request Jan 25, 2022
This is a small follow-up to #7417 which moves `import`s of `cached_cumsum` to be from `dask.utils` (its new location) instead of `dask.array.slicing` (where it used to be)
maartenbreddels added a commit to vaexio/vaex that referenced this pull request Jan 31, 2022
maartenbreddels added a commit to vaexio/vaex that referenced this pull request Jan 31, 2022
maartenbreddels added a commit to vaexio/vaex that referenced this pull request Jan 31, 2022
magnunor added a commit to magnunor/hyperspy that referenced this pull request Feb 18, 2022
This is due to a change which was introduced in dask-2022.1.1.

Seems to be coming from: dask/dask#7417
Related pull request: hyperspy#2888
ludwigschwardt added a commit to ska-sa/katdal that referenced this pull request Mar 15, 2022
Dask 2022.01.1 renamed `da.core.getem` to `da.core.graph_from_arraylike`
and changed the interface (see dask/dask#7417). These functions are
still the most convenient way to create a dask array from `get_chunk`
calls and a chunk specification, so add a shim that picks the
appropriate function.

We also keep the more convenient `getem` API (I guess until the minimum
version of dask for katdal no longer has `getem`).
ludwigschwardt added a commit to ska-sa/katdal that referenced this pull request Mar 17, 2022
This is an attempt to improve on the slight performance drop of
`da.map_blocks`. Taking our cue from the internals of the new
`da.graph_from_arraylike` function, we call `dask.blockwise.blockwise`
directly. This allows us to construct a custom graph with three
arguments to our putter function (adding the actual chunk to the usual
two getter arguments). We need this because the returned graph is
now a high-level one that is harder to modify than an old-school dict.

This improves performance to equal that of the original getem version,
even surpassing it when there are many chunks. The only downside is
that classes like `ArraySliceDep` are brand new and might still change
yet again in the near future, having been added in the recent
dask/dask#7417 PR.
ericpre pushed a commit to ericpre/rosettasciio that referenced this pull request Jul 23, 2022
This is due to a change which was introduced in dask-2022.1.1.

Seems to be coming from: dask/dask#7417
Related pull request: hyperspy/hyperspy#2888
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

9 participants