Conversation
This is a proof of concept around the proposal in dask#4329 The different engines are responsible for returning a reader function, a list of tokens/kwargs to pass to that reader function, and an optional list of statistics. We pull out all other book-keeping to a master function to avoid the duplication of tricky logic. I've only done this for pyarrow so far. Feedback appreciated.
|
@martindurant @xhochy if either of you have time to look over the changes here I would appreciate it. My goal here is to propose a design around the dask.dataframe.io.parquet module which removes burden from engine authors and places as much shared logic into core as possible. |
|
Is this independent of #4335 ? |
|
That change can stand on its own, but it was a small step en route to this. Assuming limited review time I would ask people to focus on this PR, and the second commit in particular. |
|
I've updated the header comment and also added in some additional logic around finding sorted columns/index and removing empty partitions. Feedback would be appreciated. My hope is that the pyarrow implementation is now a bit simpler. |
martindurant
left a comment
There was a problem hiding this comment.
A few thought for you. Generally, the changes are fine - but I'm not yet convinced how much simpler this layout is. That may become clear when the fastparquet arm is also done.
dask/dataframe/io/parquet.py
Outdated
|
|
||
| def _read_pyarrow(fs, fs_token, paths, columns=None, filters=None, | ||
| categories=None, index=None, infer_divisions=None): | ||
| categories=None, index=None, gather_statistics=None): |
There was a problem hiding this comment.
Par of the apparent complexity of the module is, I think, the lack of docstrings on all these functions. This may be a good time to add at least one-liners for the helper functions.
dask/dataframe/io/parquet.py
Outdated
| } | ||
| if gather_statistics: | ||
| # Read from _metadata file | ||
| if dataset.metadata and dataset.metadata.num_row_groups == len(dataset.pieces): # one rg per piece |
There was a problem hiding this comment.
I wasn't aware that the metadata was exposed like this - is it exactly the same as a _metadata file?
There was a problem hiding this comment.
It appears to have all of the information that we need and responds quickly, so I suspect so.
dask/dataframe/io/parquet.py
Outdated
| # Read from each individual piece (possibly slow) | ||
| else: | ||
| row_groups = [ | ||
| piece.get_metadata(lambda fn: pq.ParquetFile(fs.open(fn, mode='rb'))).row_group(0) |
There was a problem hiding this comment.
So this is gather_statistics=True, but no metadata? I wonder if a speed warning here might be appropriate if there are many files, or if we find that each has a large number of columns.
There was a problem hiding this comment.
Yes. I'm not sure if we actually want to do this. My guess is that we want to do something like gather statistics if there is a metadata file, or maybe if we're using a local file system, but otherwise don't unless the user explicitly tells us to.
There was a problem hiding this comment.
I would really add a Warning here. From my personal experience when you have a medium sized Parquet dataset on a (cloud) object store like AWS S3 or Azure Blob Storage this will be unusable slow. It is not easy for a first-time user to understand the problem and thus they will get frustrated and abandon dask/parquet.
There was a problem hiding this comment.
Maybe could have a timer that warns if the process is taking longer than some limit?
There was a problem hiding this comment.
I propose a statistics=None keyword that can be
- True: get statistics, even if no metadata file is present
- False: don't get statistics, even if a metadata file is present
- None: get statistics only if a metadata file is present
In this ways users need to explicitly opt-in to expensive behavior.
There was a problem hiding this comment.
If we start focusing on Arrow and adopt a policy like this then it would be good to be able to write metadata files with arrow. Reported upstream in Arrow-1983.
dask/dataframe/io/parquet.py
Outdated
| 'max': column.statistics.max, | ||
| 'null_count': column.statistics.null_count}) | ||
| s['columns'].append(d) | ||
| stats.append(s) |
There was a problem hiding this comment.
Side note: can we make use of the a-priori knowledge of number of rows somehow for len(df) or map_partitions(len)?
There was a problem hiding this comment.
I think that this would add too much complexity. If you have suggestions on how to do it well then those would be welcome, though probably in another issue.
There was a problem hiding this comment.
Coming back to this from before...
I think an implementation would amount to either a subclass of DataFrame which knows how to do len or, probably better, an optimisation specific to parquet when a len is requested - along the lines of what the previous column-selection was doing.
There was a problem hiding this comment.
I have no objection in principle to tracking more metadata, though I do think that it will be hard to do in general.
Would you mind if we defer this conversation to future work?
There was a problem hiding this comment.
Certainly, was just scrolling through the comments and noticed this. I think generally it would be very useful to Dask to pass on more information where possible, to avoid doing extra work.
dask/dataframe/io/parquet.py
Outdated
| if isinstance(columns, str): | ||
| df = read_parquet(path, [columns], filters, categories, index, | ||
| storage_options, engine, infer_divisions) | ||
| return df[columns] |
There was a problem hiding this comment.
This becomes the way to make a series?
The previous asserts on columns being a list could be dropped I think.
There was a problem hiding this comment.
Yes. It seems simpler and centralizes a bunch of logic that was way further down. This change is isolated to the first commit, which has many more deletions than additions.
dask/dataframe/io/parquet.py
Outdated
|
|
||
| paths = sorted(paths, key=natural_sort_key) # numeric rather than glob ordering | ||
|
|
||
| func, meta, statistics, parts = read(fs, fs_token, paths, columns=columns, filters=filters, |
There was a problem hiding this comment.
Do we actually use the fs token? Seems like open_files would be cleaner, if possible, and I see the name, above, doesn't use it.
dask/dataframe/io/parquet.py
Outdated
| divisions = out[0]['divisions'] | ||
| index = out[0]['name'] | ||
| elif len(out) > 1: | ||
| warnings.warn("Multiple sorted columns found: " + ", ".join(o['name'] for o in out)) |
There was a problem hiding this comment.
i.e., the user should ideally pick one? If they have picked one, then we don't care that multiple possible index columns exist. Conversely, they may have said no index, or the given index column is not sorted, so there being exactly one sorted column shouldn't mean it automatically becomes the index.
There was a problem hiding this comment.
Yes, ideally we listen to the index= keyword here. I haven't gotten to this yet.
dask/dataframe/io/parquet.py
Outdated
|
|
||
| def read_parquet_part(func, fs, meta, part, columns, index, kwargs): | ||
| """ Read a part of a parquet dataset """ | ||
| df = func(fs, part, columns, **kwargs) |
There was a problem hiding this comment.
Comment here to point to the two possible values of func, which would otherwise be tricky to divine here.
If the file is again opened within the pyarrow function (it certainly would be for fastparquet), then OpenFiles would be better than passing around the filesystem (@jcrist , agree on this point?).
There was a problem hiding this comment.
What do you mean by the two possible values of func? Which values are you referring to?
There was a problem hiding this comment.
One for pyarrow, one for fastparquet
dask/dataframe/io/parquet.py
Outdated
| return out | ||
|
|
||
|
|
||
| def sorted_columns(statistics): |
There was a problem hiding this comment.
Some helper functions are hidden, some are not. Perhaps all functions should be normal and documented?
We're removing a lot of burden from the backends. Complexity isn't significantly reduced overall, it's just moved to a central location and hopefully standardized.
Do you have any interest in engaging on this? I could use help. |
Interest, yes, but I don't want to promise time - I'll be planning effort allocation in the next week or so. |
Alternatively, maybe this could be a time to reconsider what we support long term? My inclination here is to see what I can clean up to make things more cleanly extensible, targetting the pyarrow interface as first customer. I suspect that we'll run into issues with pyarrow which I plan to push on or work on upstream to see if we can get things working smoothly. If that goes well and we can get all of the support we need from arrow then I'd be inclined to sunset our support of Also, for transparency, I'm working on this because I expect that NVIDIA will have a cudf parquet reader prototype in a few weeks, and am looking into how expensive it would be to hook it up to dask.dataframe. |
This includes a few small changes after going through a few tests in the test suite.
|
So, what happens if I get things nicely working with Arrow, but not yet with Fastparquet? Do we wait on this PR until someone implements the fastparquet side of this rewrite, or do we merge without fastparquet support? |
|
In https://github.com/mrocklin/dask/pulls#issuecomment-501911299 @rjzamora says I have worked through many of the tests for the A few notes:
@mrocklin @martindurant Please let me know if we want/need the behavior of |
|
I think it would be better to be explicit about the index rather than automatically detect. The exception would be if you want to set Dask-specific metadata in the file footer to indicate which field should be used as the index |
I know @mrocklin has also suggested in the past, but seems a shame to write the Pandas metadata (which took some effort to get right!) and then not use it. Specifying the index is one of the few things that the parquet metadata was not able to do.
Those are the hard ones! For arrow output without a |
@martindurant Sorry - I should specify that I was skipping them for now. However, I will probably get them addressed today. |
|
Certainly, but they've caused me enough headaches in the past... Good luck! |
That is understandable. I will try to work out a clean/general way to work this in. |
|
@martindurant I think I might be using the wrong terminology, the inference logic I was referring to is the divisions: https://github.com/dask/dask/blob/master/dask/dataframe/io/parquet.py#L1122 |
|
@rjzamora I didn't look at the diff in detail (it's big :-)), but just wanted to check: do you think the changes you have been doing in pyarrow for getting and writing the metadata will prove sufficient for the implementation here in dask? |
|
@jorisvandenbossche Yes - My mpression is that the basic features needed for dask are now available in pyarrow. There are still opportunities for improvement, but the highest-priority items have been addressed. |
|
That sounds good. Feel free to open JIRA issues on things that can be further improved or made easier with pyarrow while you encounter them! |
|
I anticipate that we will run into issues when reading statistics of derived datatypes like text and datetimes. I think that I raised a JIRA about this a while ago that is still open. I doubt that we'll get a fix in before the end of the week though, so I wouldn't hold things up for this. |
|
You are probably talking about https://issues.apache.org/jira/browse/ARROW-4139 ? |
|
Yes. That one.
…On Mon, Jun 17, 2019 at 9:36 PM Joris Van den Bossche < ***@***.***> wrote:
You are probably talking about
https://issues.apache.org/jira/browse/ARROW-4139 ?
—
You are receiving this because you were mentioned.
Reply to this email directly, view it on GitHub
<#4336?email_source=notifications&email_token=AACKZTG64FPSWHKP6OSMUNDP27RT7A5CNFSM4GMSXBIKYY3PNVWWK3TUL52HS4DFVREXG43VMVBW63LNMVXHJKTDN5WW2ZLOORPWSZGODX4HDFI#issuecomment-502821269>,
or mute the thread
<https://github.com/notifications/unsubscribe-auth/AACKZTA73SCBJSCJ6ZHGQBDP27RT7ANCNFSM4GMSXBIA>
.
|
This is a proof of concept around the proposal in #4329
An engine is responsible for the following:
Engines are not responsible for ...
I've only done this for pyarrow reading so far. Feedback appreciated.