Skip to content

add to/read_zarr#3460

Merged
jakirkham merged 19 commits intodask:masterfrom
martindurant:zarr
May 26, 2018
Merged

add to/read_zarr#3460
jakirkham merged 19 commits intodask:masterfrom
martindurant:zarr

Conversation

@martindurant
Copy link
Member

@martindurant martindurant commented May 2, 2018

  • Tests added / passed
  • Passes flake8 dask

Martin Durant added 2 commits May 2, 2018 15:06
@martindurant
Copy link
Member Author

martindurant commented May 2, 2018

A simplistic approach to handling zarr.

Adding the get_mapper function feels rather wrong, and it would be much better if the filesystem instances provided a get_mapper method - something that could be implemented automatically in fsspec, if filesystems chose to adopt that framework.

Note that this will fail is the chunking is not regular.

Comments and opinions welcome.

@martindurant
Copy link
Member Author

Ref #3302
(eventually) fixes #3457

Copy link
Member

@mrocklin mrocklin left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Generally seems sensible. A few small comments. Looking forward to seeing where this goes.

return nonzero(self)

def to_zarr(self, *args, **kwargs):
to_zarr(self, *args, **kwargs)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The to_zarr function should probably return a delayed object if compute=False. This should pass it through

z = zarr.open_array(mapper, mode='r', **kwargs)
else:
z = zarr.open_group(mapper, mode='r', **kwargs)[component]
return from_array(z, z.chunks, name='zarr-%s' % url)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need to think about locks or Zarr synchronizers here? Or is it best to delegate that through kwargs?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't know what to do about that. If the chunks are guaranteed to exactly match, then there should not be any contention.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not aware of any issues reading from Zarr in parallel (chunks matching or not).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nope, no synchronization needed when reading.

from hdfs3.mapping import HDFSMap
return HDFSMap(fs, path)
else:
raise ValueError('No mapper for protocol "%s"' % fs.protocol)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If only there was some sort of standard file system abstraction library that would have utility functions like good error messages when importing ...

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

:)

@martindurant martindurant changed the title WIP: First stab at zarr add to/read_zarr May 4, 2018
z = zarr.open_group(mapper, mode=mode).create_dataset(
component, shape=arr.shape, chunks=chunks, dtype=arr.dtype,
**kwargs)
return store(arr, z, compute=compute, return_stored=return_stored)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Though we might want to provide the option to specify a lock for writing.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the dask array to be stored has regular chunks, then I think a lock is not needed, because the writes will be aligned with the chunks in the newly created zarr array. In that case could probably pass lock=False to store().

Not sure what to do if dask array has irregular chunks. Storing should still be possible but writes may not be aligned with zarr array chunks. Is it worth doing something like arr = rechunk(arr, z.chunks) just to be sure writes will be aligned?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It sounds like we should rechunk before calling store? Presumably it's better to rechunk beforehand.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually, it might be nice to centralize that logic within store itself with an optional chunks= parameter. Presumably we might choose to align dask.array chunks so that they align with, but are possibly larger than, the given chunks

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that's exactly wha I suggested in the chunks issue. Here we should apply rechunk as given, including the option of not rechunking, and error if the chunks are not regular.

Copy link
Member

@jakirkham jakirkham May 4, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed using rechunk is much better than locking (that is what we do).

As to how to irregular chunks should be dealt with, there is some discussion in issue ( #3302 ) about how to do this. Would be curious to hear your thoughts over there, @alimanfoo. 😉

A simple first pass solution may be just to raise if the chunks are irregular and request the user make them uniform.

Edit: Oops, sorry to duplicate @martindurant. Your post showed up after I posted this in the diff view. 😞

@jakirkham
Copy link
Member

Thanks for doing this @martindurant. This looks great! :)

Expect @alimanfoo would be interested in looking at this. ;)

@mrocklin
Copy link
Member

mrocklin commented May 4, 2018

@jakirkham is also trying to maintain a list of optional dependencies in #3456

Do we know the minimum version of Zarr for which this will work?

@jakirkham
Copy link
Member

Would encourage requiring 2.2.0 to start unless there is some reason to start with an older version. There is enough of a difference between 2.1.x and 2.2.0 that it would probably just be easier to start with a recent requirement. Am pretty sure that is what xarray is using as well. Though @rabernat and @jhamman would know more.

Copy link
Contributor

@alimanfoo alimanfoo left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Great to see this PR, will be very useful!

----------
url: str
Location of the data. This can include a protocol specifier like s3://
for remote data.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if it's worth allowing this first argument to either be a string (in which case interpreted as url) or a mapping. Then you'd get full generality to use any type of store, including zip files, lmdb, etc.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A suitable mapping is any instance of collections.MutableMapping?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That sounds correct (relevant doc snippet below). There are some operations that can be optimized by defining some special internal methods, but they are not strictly required.

Note that any object implementing the MutableMapping interface from the collections module in the Python standard library can be used as a Zarr array store, as long as it accepts string (str) keys and bytes values.

ref: http://zarr.readthedocs.io/en/latest/api/storage.html#module-zarr.storage

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That said, have seen a lot of work that you have been doing involving URLs and paths in the Dataframe's side of the code base (though am not to familiar with what is going on there). Maybe it's worth mentioning what you had in mind and how that compares to what has been done with Dataframe's already.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

FWIW I think you could check if the argument is a string, if so interpret as URL, otherwise assume it's a mapping (let duck typing occur naturally by passing through to zarr). But if you want an explicit check, all store classes in zarr do sub-class from MutableMapping, so should be fine too.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the first of those two suggestions makes the most sense to me.

Data to store
url: str
Location of the data. This can include a protocol specifier like s3://
for remote data.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Again wonder if this could be string or mapping.

return Array(dsk, name, chunks, dtype=x.dtype)


def from_zarr(url, component=None, storage_options=None, **kwargs):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would be nice to have an option to override the chunks size. Generally have found the chunk size that we choose for storage on disk and what we choose for Dask are different. That might not be everyone's case though. So having it default to the on disk chunk seems sensible. Just the option to override would be good.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should add that choosing a different chunk size to start is way faster than rechunking afterwards. So having the option is pretty important.

Martin Durant added 2 commits May 7, 2018 09:58
Dockstring for rechunk=  parameter suggests that rechunk will
allow various chunking schemes. Intent is to have None become the
default, and decide which scheme is best for it.
False
"""
for chunks in chunkset:
if len(chunks) < 2:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Minor point: From a readability perspective == 1 is a bit clearer.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it possible to have an empty array with no chunks?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good question! So if it is an empty array, chunkset is an empty tuple meaning we don't enter this for-loop. If it is a 0-D array, then we have chunkset as ((0,),). So chunks would be length 1. For any higher dimensionality array, there would need to at least be one chunk per dimension with some length meaning each would be at least length 1.

"""
import zarr
if rechunk is not False:
arr = arr.rechunk(rechunk)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seems to allow None here, which would cause an exception. What would we like to do if rechunk is None?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right, I haven't implemented any of my suggestions into rechunk yet, but presumably we will need to pick a default. If preferred, this could be called "default".

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See the example simple rechunker I pushed. This could genuinely be a useful one, but as I commented before, I expect there to be a few of these, and we can discuss which makes the best default.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed that does seem useful.

Should we pick one for None at this stage? Also should we specify strs as valid arguments for rechunk then?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can pick now, or I can continue to make some more.
Not sure if that belongs in this PR, though. The set of rechunkers ought to have expensive tests. At this point, I pushed it only so that you could see what I had in mind.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure. We can save the discussion for another PR. Thanks for sharing.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Perhaps this should happen in a future PR? If so we should probably remove these lines.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IIUC we decided to add this to avoid locking during writes. If I'm misunderstanding, please feel free to correct me. We can certainly consider locking or other options. rechunking seems easiest.

That said, there were a lot of ideas floating around about how best to rechunk in different scenarios and make these available to the user. So wouldn't want us to restrict ourselves early while we are still exploring that.

Personally don't have a strong feelings as to whether we keep this in this PR given that last point. We may want to add a note to the docstring about regular chunks being required if we don't supply this. As we already raise for irregular chunks with a nice error message, we should avoid a lot of problems that could come up.

Happy to defer to others on this.

@martindurant
Copy link
Member Author

So I pulled out the rechunk options for now, but implemented passing a mapping as suggested (simple string check).

@martindurant
Copy link
Member Author

Anything more suggested here?

@jakirkham
Copy link
Member

Would it be possible to have an optional chunks parameter in from_zarr to override using the on disk chunking?

@martindurant
Copy link
Member Author

Certainly possible, that would be equivalent to from_zarr().rechunk(), right?

@jakirkham
Copy link
Member

It would be passing the chunks into from_array. Turns out there’s a big performance difference between doing this and using rechunk afterwards.

@martindurant
Copy link
Member Author

I hadn't realised that - now I understand your comment from before.

Rechunking to be applied to the array before storage, since zarr
requires a regular chunks scheme - passed to ``.rechunk()``.
If False, no rechunk operation is performed;
if the chunks are not regular, an exception is raised.
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I find that this text now accurately describes the situation, and says nothing about how to rechunk, only that you might need to do it. The exception is readable, I'd like to think. Rechunking for storage should always be allowed, I think, and any function that passes on to store should allow it.

@martindurant
Copy link
Member Author

Removing rechunk= parameter, since it only adds one line of code for users, and gives a decent error message. I am not convinced that this is the right thing to do, but indeed we can add it back later.


The `zarr`_ format is a chunk-wise binary array storage file format, with a good selection
of encoding and compression options. Due to each chunk being stored in a separate file, it
is ideal for parallel access i both reading and writing (for the latter, if the dask array
Copy link
Member

@jakirkham jakirkham May 22, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i -> in

@jakirkham
Copy link
Member

Generally looks fine to me. Though kind of dealing with a fever ATM. So might not be the best reviewer. Sorry about that.

Thanks for working on this @martindurant. Looking forward to using it. 😄

@alimanfoo
Copy link
Contributor

FWIW this is all looking good to me. Only thing I noticed is the overwrite_group parameter, I'm not sure this is quite right. I'll try to follow up with some explanation tomorrow.

mode = 'w' if overwrite_group else 'r+'
z = zarr.open_group(mapper, mode=mode).create_dataset(
component, shape=arr.shape, chunks=chunks, dtype=arr.dtype,
**kwargs)
Copy link
Contributor

@alimanfoo alimanfoo May 23, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it would be better to have an overwrite argument into this function, rather than an overwrite_group argument.

Then this whole if component is None: ... else: ... block could be replaced by:

z = zarr.create(shape=arr.shape, chunks=chunks, dtype=arr.dtype, store=mapper, path=component, overwrite=overwrite, **kwargs) 

As well as being simpler code, this also would simplify the logic around whether to overwrite existing data, because the behaviour will be the same whether or not the component argument is provided. I.e., with this change, if the user provides overwrite=False, then if an array exists an exception will be raised. Conversely, if user provides overwrite=True, an existing array will be deleted and overwritten.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I didn't realise it could be done as simply as that!

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah the open... functions are sometimes a bit of a distraction.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it worth deprecating/consolidating them?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes worth considering. Raised zarr-developers/zarr-python#264 for discussion.

Array
+++++

<<<<<<< HEAD
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Think this crept in from a merge conflict resolution.

if component is None:
z = zarr.open_array(mapper, mode='r', **kwargs)
else:
z = zarr.open_group(mapper, mode='r', **kwargs)[component]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

FWIW you could replace the if component is None else block here with:

z = zarr.Array(store=mapper, path=component, read_only=True, **kwargs)



def get_mapper(fs, path):
# This is not the right wayt o do this.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: wayt o -> way to

If given array already exists, overwrite=False will cause an error,
where overwrite=True will replace the existing data.
compute, return_stored: see ``store()``
kwargs: passed to zarr's open functions, e.g., compression options
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

...passed to the zarr.create() function...

Passed to ``da.from_array``, allows setting the chunks on
initialisation, if the chunking scheme in the on-disc dataset is not
optimal for the calculations to follow.
kwargs: passed to zarr's open functions.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If simplify below, then maybe update this doc line too. Although actually probably don't need kwargs at all, there isn't really anything user would want to pass I don't think, although doesn't hurt to leave it for future compat.

Copy link
Contributor

@alimanfoo alimanfoo left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

All looks good to me.

return from_array(z, chunks, name='zarr-%s' % url)


def to_zarr(arr, url, component=None, storage_options=None,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One more thing forgot to say. The url argument could default to None. This would have the effect of creating a new in-memory array. User would have to also provide return_stored=True to be able to receive the new array, so there could be a gotcha with current default return_stored=False (maybe that should be True here?).

Might be convenient in the case where you want to compute a result without having to bother about putting any data down on disk. I.e., given some Dask array d, would be nice to be able to just do z = d.to_zarr() and get back an in-memory zarr array. In my work there's plenty of cases where I use in-memory zarr arrays, because it's quick and convenient and data are small enough with compression to fit in memory.

Don't mind if you'd rather leave as-is, just a thought.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The url argument could default to None. This would have the effect of creating a new in-memory array

I would think this is stretching the idea of what this method does, and be unexpected. A better approach maybe would be #2741 (i.e., a URL explicitly starting with 'memory://', but the reason that PR is languishing is that it's not clear what should happen in distributed memory.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK, no problem.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would just passing in a Zarr Array instance for url work for that case? If so, maybe this is already possible with a slightly different set of parameters. ;)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

FWIW I think it's good as-is. Might get a bit confusing if there is too much flexibility around what the url arg can be. And wouldn't gain much convenience, user would have to set up array themselves. My (lazy) use case is wanting to be able to do z = d.to_zarr(), which is like calling d.compute() but where the result is computed into an in-memory zarr array rather than numpy array. But happy to revisit that later, don't want to hold up this very nice PR :)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

FWIW I think it's good as-is. Might get a bit confusing if there is too much flexibility around what the url arg can be. And wouldn't gain much convenience, user would have to set up array themselves. My (lazy) use case is wanting to be able to do z = d.to_zarr(), which is like calling d.compute() but where the result is computed into an in-memory zarr array rather than numpy array. But happy to revisit that later, don't want to hold up this very nice PR :)

@jakirkham
Copy link
Member

Took the liberty of resolving merge conflicts. Hope that is ok.

@jakirkham
Copy link
Member

Sounds like we are happy with this. So going to get it in.

@jakirkham jakirkham merged commit 79c181a into dask:master May 26, 2018
@jakirkham
Copy link
Member

Thanks for working on this @martindurant. Very nice addition. Also thanks everyone for helping review. Will be great to play with this in the next release. :)

@martindurant martindurant deleted the zarr branch May 27, 2018 23:44
@martindurant
Copy link
Member Author

Thanks for the rebase and conversation while I was away :)

chunks = [c[0] for c in arr.chunks]
z = zarr.create(shape=arr.shape, chunks=chunks, dtype=arr.dtype,
store=mapper, path=component, overwrite=overwrite, **kwargs)
return store(arr, z, compute=compute, return_stored=return_stored)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Missed that we weren't actually setting lock=False here. Fixing in PR ( #3607 ).

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants