Conversation
|
A simplistic approach to handling zarr. Adding the Note that this will fail is the chunking is not regular. Comments and opinions welcome. |
mrocklin
left a comment
There was a problem hiding this comment.
Generally seems sensible. A few small comments. Looking forward to seeing where this goes.
dask/array/core.py
Outdated
| return nonzero(self) | ||
|
|
||
| def to_zarr(self, *args, **kwargs): | ||
| to_zarr(self, *args, **kwargs) |
There was a problem hiding this comment.
The to_zarr function should probably return a delayed object if compute=False. This should pass it through
dask/array/core.py
Outdated
| z = zarr.open_array(mapper, mode='r', **kwargs) | ||
| else: | ||
| z = zarr.open_group(mapper, mode='r', **kwargs)[component] | ||
| return from_array(z, z.chunks, name='zarr-%s' % url) |
There was a problem hiding this comment.
Do we need to think about locks or Zarr synchronizers here? Or is it best to delegate that through kwargs?
There was a problem hiding this comment.
I don't know what to do about that. If the chunks are guaranteed to exactly match, then there should not be any contention.
There was a problem hiding this comment.
Not aware of any issues reading from Zarr in parallel (chunks matching or not).
There was a problem hiding this comment.
Nope, no synchronization needed when reading.
| from hdfs3.mapping import HDFSMap | ||
| return HDFSMap(fs, path) | ||
| else: | ||
| raise ValueError('No mapper for protocol "%s"' % fs.protocol) |
There was a problem hiding this comment.
If only there was some sort of standard file system abstraction library that would have utility functions like good error messages when importing ...
| z = zarr.open_group(mapper, mode=mode).create_dataset( | ||
| component, shape=arr.shape, chunks=chunks, dtype=arr.dtype, | ||
| **kwargs) | ||
| return store(arr, z, compute=compute, return_stored=return_stored) |
There was a problem hiding this comment.
Though we might want to provide the option to specify a lock for writing.
There was a problem hiding this comment.
If the dask array to be stored has regular chunks, then I think a lock is not needed, because the writes will be aligned with the chunks in the newly created zarr array. In that case could probably pass lock=False to store().
Not sure what to do if dask array has irregular chunks. Storing should still be possible but writes may not be aligned with zarr array chunks. Is it worth doing something like arr = rechunk(arr, z.chunks) just to be sure writes will be aligned?
There was a problem hiding this comment.
It sounds like we should rechunk before calling store? Presumably it's better to rechunk beforehand.
There was a problem hiding this comment.
Actually, it might be nice to centralize that logic within store itself with an optional chunks= parameter. Presumably we might choose to align dask.array chunks so that they align with, but are possibly larger than, the given chunks
There was a problem hiding this comment.
I think that's exactly wha I suggested in the chunks issue. Here we should apply rechunk as given, including the option of not rechunking, and error if the chunks are not regular.
There was a problem hiding this comment.
Agreed using rechunk is much better than locking (that is what we do).
As to how to irregular chunks should be dealt with, there is some discussion in issue ( #3302 ) about how to do this. Would be curious to hear your thoughts over there, @alimanfoo. 😉
A simple first pass solution may be just to raise if the chunks are irregular and request the user make them uniform.
Edit: Oops, sorry to duplicate @martindurant. Your post showed up after I posted this in the diff view. 😞
|
Thanks for doing this @martindurant. This looks great! :) Expect @alimanfoo would be interested in looking at this. ;) |
|
@jakirkham is also trying to maintain a list of optional dependencies in #3456 Do we know the minimum version of Zarr for which this will work? |
|
Would encourage requiring 2.2.0 to start unless there is some reason to start with an older version. There is enough of a difference between 2.1.x and 2.2.0 that it would probably just be easier to start with a recent requirement. Am pretty sure that is what xarray is using as well. Though @rabernat and @jhamman would know more. |
dask/array/core.py
Outdated
| ---------- | ||
| url: str | ||
| Location of the data. This can include a protocol specifier like s3:// | ||
| for remote data. |
There was a problem hiding this comment.
I wonder if it's worth allowing this first argument to either be a string (in which case interpreted as url) or a mapping. Then you'd get full generality to use any type of store, including zip files, lmdb, etc.
There was a problem hiding this comment.
A suitable mapping is any instance of collections.MutableMapping?
There was a problem hiding this comment.
That sounds correct (relevant doc snippet below). There are some operations that can be optimized by defining some special internal methods, but they are not strictly required.
Note that any object implementing the
MutableMappinginterface from thecollectionsmodule in the Python standard library can be used as a Zarr array store, as long as it accepts string (str) keys and bytes values.
ref: http://zarr.readthedocs.io/en/latest/api/storage.html#module-zarr.storage
There was a problem hiding this comment.
That said, have seen a lot of work that you have been doing involving URLs and paths in the Dataframe's side of the code base (though am not to familiar with what is going on there). Maybe it's worth mentioning what you had in mind and how that compares to what has been done with Dataframe's already.
There was a problem hiding this comment.
FWIW I think you could check if the argument is a string, if so interpret as URL, otherwise assume it's a mapping (let duck typing occur naturally by passing through to zarr). But if you want an explicit check, all store classes in zarr do sub-class from MutableMapping, so should be fine too.
There was a problem hiding this comment.
I think the first of those two suggestions makes the most sense to me.
dask/array/core.py
Outdated
| Data to store | ||
| url: str | ||
| Location of the data. This can include a protocol specifier like s3:// | ||
| for remote data. |
There was a problem hiding this comment.
Again wonder if this could be string or mapping.
dask/array/core.py
Outdated
| return Array(dsk, name, chunks, dtype=x.dtype) | ||
|
|
||
|
|
||
| def from_zarr(url, component=None, storage_options=None, **kwargs): |
There was a problem hiding this comment.
It would be nice to have an option to override the chunks size. Generally have found the chunk size that we choose for storage on disk and what we choose for Dask are different. That might not be everyone's case though. So having it default to the on disk chunk seems sensible. Just the option to override would be good.
There was a problem hiding this comment.
Should add that choosing a different chunk size to start is way faster than rechunking afterwards. So having the option is pretty important.
Dockstring for rechunk= parameter suggests that rechunk will allow various chunking schemes. Intent is to have None become the default, and decide which scheme is best for it.
dask/array/core.py
Outdated
| False | ||
| """ | ||
| for chunks in chunkset: | ||
| if len(chunks) < 2: |
There was a problem hiding this comment.
Minor point: From a readability perspective == 1 is a bit clearer.
There was a problem hiding this comment.
Is it possible to have an empty array with no chunks?
There was a problem hiding this comment.
Good question! So if it is an empty array, chunkset is an empty tuple meaning we don't enter this for-loop. If it is a 0-D array, then we have chunkset as ((0,),). So chunks would be length 1. For any higher dimensionality array, there would need to at least be one chunk per dimension with some length meaning each would be at least length 1.
dask/array/core.py
Outdated
| """ | ||
| import zarr | ||
| if rechunk is not False: | ||
| arr = arr.rechunk(rechunk) |
There was a problem hiding this comment.
This seems to allow None here, which would cause an exception. What would we like to do if rechunk is None?
There was a problem hiding this comment.
Right, I haven't implemented any of my suggestions into rechunk yet, but presumably we will need to pick a default. If preferred, this could be called "default".
There was a problem hiding this comment.
See the example simple rechunker I pushed. This could genuinely be a useful one, but as I commented before, I expect there to be a few of these, and we can discuss which makes the best default.
There was a problem hiding this comment.
Agreed that does seem useful.
Should we pick one for None at this stage? Also should we specify strs as valid arguments for rechunk then?
There was a problem hiding this comment.
We can pick now, or I can continue to make some more.
Not sure if that belongs in this PR, though. The set of rechunkers ought to have expensive tests. At this point, I pushed it only so that you could see what I had in mind.
There was a problem hiding this comment.
Sure. We can save the discussion for another PR. Thanks for sharing.
There was a problem hiding this comment.
Perhaps this should happen in a future PR? If so we should probably remove these lines.
There was a problem hiding this comment.
IIUC we decided to add this to avoid locking during writes. If I'm misunderstanding, please feel free to correct me. We can certainly consider locking or other options. rechunking seems easiest.
That said, there were a lot of ideas floating around about how best to rechunk in different scenarios and make these available to the user. So wouldn't want us to restrict ourselves early while we are still exploring that.
Personally don't have a strong feelings as to whether we keep this in this PR given that last point. We may want to add a note to the docstring about regular chunks being required if we don't supply this. As we already raise for irregular chunks with a nice error message, we should avoid a lot of problems that could come up.
Happy to defer to others on this.
|
So I pulled out the rechunk options for now, but implemented passing a mapping as suggested (simple string check). |
|
Anything more suggested here? |
|
Would it be possible to have an optional |
|
Certainly possible, that would be equivalent to |
|
It would be passing the |
|
I hadn't realised that - now I understand your comment from before. |
dask/array/core.py
Outdated
| Rechunking to be applied to the array before storage, since zarr | ||
| requires a regular chunks scheme - passed to ``.rechunk()``. | ||
| If False, no rechunk operation is performed; | ||
| if the chunks are not regular, an exception is raised. |
There was a problem hiding this comment.
I find that this text now accurately describes the situation, and says nothing about how to rechunk, only that you might need to do it. The exception is readable, I'd like to think. Rechunking for storage should always be allowed, I think, and any function that passes on to store should allow it.
|
Removing |
docs/source/array-creation.rst
Outdated
|
|
||
| The `zarr`_ format is a chunk-wise binary array storage file format, with a good selection | ||
| of encoding and compression options. Due to each chunk being stored in a separate file, it | ||
| is ideal for parallel access i both reading and writing (for the latter, if the dask array |
|
Generally looks fine to me. Though kind of dealing with a fever ATM. So might not be the best reviewer. Sorry about that. Thanks for working on this @martindurant. Looking forward to using it. 😄 |
|
FWIW this is all looking good to me. Only thing I noticed is the overwrite_group parameter, I'm not sure this is quite right. I'll try to follow up with some explanation tomorrow. |
dask/array/core.py
Outdated
| mode = 'w' if overwrite_group else 'r+' | ||
| z = zarr.open_group(mapper, mode=mode).create_dataset( | ||
| component, shape=arr.shape, chunks=chunks, dtype=arr.dtype, | ||
| **kwargs) |
There was a problem hiding this comment.
I think it would be better to have an overwrite argument into this function, rather than an overwrite_group argument.
Then this whole if component is None: ... else: ... block could be replaced by:
z = zarr.create(shape=arr.shape, chunks=chunks, dtype=arr.dtype, store=mapper, path=component, overwrite=overwrite, **kwargs)
As well as being simpler code, this also would simplify the logic around whether to overwrite existing data, because the behaviour will be the same whether or not the component argument is provided. I.e., with this change, if the user provides overwrite=False, then if an array exists an exception will be raised. Conversely, if user provides overwrite=True, an existing array will be deleted and overwritten.
There was a problem hiding this comment.
I didn't realise it could be done as simply as that!
There was a problem hiding this comment.
Yeah the open... functions are sometimes a bit of a distraction.
There was a problem hiding this comment.
Is it worth deprecating/consolidating them?
There was a problem hiding this comment.
Yes worth considering. Raised zarr-developers/zarr-python#264 for discussion.
docs/source/changelog.rst
Outdated
| Array | ||
| +++++ | ||
|
|
||
| <<<<<<< HEAD |
There was a problem hiding this comment.
Think this crept in from a merge conflict resolution.
dask/array/core.py
Outdated
| if component is None: | ||
| z = zarr.open_array(mapper, mode='r', **kwargs) | ||
| else: | ||
| z = zarr.open_group(mapper, mode='r', **kwargs)[component] |
There was a problem hiding this comment.
FWIW you could replace the if component is None else block here with:
z = zarr.Array(store=mapper, path=component, read_only=True, **kwargs)
dask/bytes/core.py
Outdated
|
|
||
|
|
||
| def get_mapper(fs, path): | ||
| # This is not the right wayt o do this. |
dask/array/core.py
Outdated
| If given array already exists, overwrite=False will cause an error, | ||
| where overwrite=True will replace the existing data. | ||
| compute, return_stored: see ``store()`` | ||
| kwargs: passed to zarr's open functions, e.g., compression options |
There was a problem hiding this comment.
...passed to the zarr.create() function...
dask/array/core.py
Outdated
| Passed to ``da.from_array``, allows setting the chunks on | ||
| initialisation, if the chunking scheme in the on-disc dataset is not | ||
| optimal for the calculations to follow. | ||
| kwargs: passed to zarr's open functions. |
There was a problem hiding this comment.
If simplify below, then maybe update this doc line too. Although actually probably don't need kwargs at all, there isn't really anything user would want to pass I don't think, although doesn't hurt to leave it for future compat.
| return from_array(z, chunks, name='zarr-%s' % url) | ||
|
|
||
|
|
||
| def to_zarr(arr, url, component=None, storage_options=None, |
There was a problem hiding this comment.
One more thing forgot to say. The url argument could default to None. This would have the effect of creating a new in-memory array. User would have to also provide return_stored=True to be able to receive the new array, so there could be a gotcha with current default return_stored=False (maybe that should be True here?).
Might be convenient in the case where you want to compute a result without having to bother about putting any data down on disk. I.e., given some Dask array d, would be nice to be able to just do z = d.to_zarr() and get back an in-memory zarr array. In my work there's plenty of cases where I use in-memory zarr arrays, because it's quick and convenient and data are small enough with compression to fit in memory.
Don't mind if you'd rather leave as-is, just a thought.
There was a problem hiding this comment.
The url argument could default to None. This would have the effect of creating a new in-memory array
I would think this is stretching the idea of what this method does, and be unexpected. A better approach maybe would be #2741 (i.e., a URL explicitly starting with 'memory://', but the reason that PR is languishing is that it's not clear what should happen in distributed memory.
There was a problem hiding this comment.
Would just passing in a Zarr Array instance for url work for that case? If so, maybe this is already possible with a slightly different set of parameters. ;)
There was a problem hiding this comment.
FWIW I think it's good as-is. Might get a bit confusing if there is too much flexibility around what the url arg can be. And wouldn't gain much convenience, user would have to set up array themselves. My (lazy) use case is wanting to be able to do z = d.to_zarr(), which is like calling d.compute() but where the result is computed into an in-memory zarr array rather than numpy array. But happy to revisit that later, don't want to hold up this very nice PR :)
There was a problem hiding this comment.
FWIW I think it's good as-is. Might get a bit confusing if there is too much flexibility around what the url arg can be. And wouldn't gain much convenience, user would have to set up array themselves. My (lazy) use case is wanting to be able to do z = d.to_zarr(), which is like calling d.compute() but where the result is computed into an in-memory zarr array rather than numpy array. But happy to revisit that later, don't want to hold up this very nice PR :)
|
Took the liberty of resolving merge conflicts. Hope that is ok. |
|
Sounds like we are happy with this. So going to get it in. |
|
Thanks for working on this @martindurant. Very nice addition. Also thanks everyone for helping review. Will be great to play with this in the next release. :) |
|
Thanks for the rebase and conversation while I was away :) |
| chunks = [c[0] for c in arr.chunks] | ||
| z = zarr.create(shape=arr.shape, chunks=chunks, dtype=arr.dtype, | ||
| store=mapper, path=component, overwrite=overwrite, **kwargs) | ||
| return store(arr, z, compute=compute, return_stored=return_stored) |
There was a problem hiding this comment.
Missed that we weren't actually setting lock=False here. Fixing in PR ( #3607 ).
flake8 dask