Skip to content

WIP - Refactor Parquet reader#4336

Closed
mrocklin wants to merge 70 commits intodask:masterfrom
mrocklin:parquet-refactor
Closed

WIP - Refactor Parquet reader#4336
mrocklin wants to merge 70 commits intodask:masterfrom
mrocklin:parquet-refactor

Conversation

@mrocklin
Copy link
Member

@mrocklin mrocklin commented Dec 29, 2018

This is a proof of concept around the proposal in #4329

An engine is responsible for the following:

  • Providing a function that, given a path argument and a filesystem, provides a list of objects representing row groups
  • Provides a function that turns those row groups into a dataframe-like object
  • Optionally gathers statistics information about those row groups, which can be used by the core project to remove empty partitions or find an index

Engines are not responsible for ...

  • Finding the index (they just provide statistics)
  • Filtering out partitions (they just provide statistics)
  • Removing empty partitions (they just provide statistics)
  • Returning a series or dataframe (they always return dataframes for simplicity)

I've only done this for pyarrow reading so far. Feedback appreciated.

This is a proof of concept around the proposal in
dask#4329

The different engines are responsible for returning a reader function, a list
of tokens/kwargs to pass to that reader function, and an optional list of
statistics.  We pull out all other book-keeping to a master function to avoid
the duplication of tricky logic.

I've only done this for pyarrow so far.  Feedback appreciated.
@mrocklin
Copy link
Member Author

@martindurant @xhochy if either of you have time to look over the changes here I would appreciate it.

My goal here is to propose a design around the dask.dataframe.io.parquet module which removes burden from engine authors and places as much shared logic into core as possible.

@martindurant
Copy link
Member

Is this independent of #4335 ?

@mrocklin
Copy link
Member Author

That change can stand on its own, but it was a small step en route to this. Assuming limited review time I would ask people to focus on this PR, and the second commit in particular.

@mrocklin
Copy link
Member Author

I've updated the header comment and also added in some additional logic around finding sorted columns/index and removing empty partitions.

Feedback would be appreciated. My hope is that the pyarrow implementation is now a bit simpler.

Copy link
Member

@martindurant martindurant left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A few thought for you. Generally, the changes are fine - but I'm not yet convinced how much simpler this layout is. That may become clear when the fastparquet arm is also done.


def _read_pyarrow(fs, fs_token, paths, columns=None, filters=None,
categories=None, index=None, infer_divisions=None):
categories=None, index=None, gather_statistics=None):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Par of the apparent complexity of the module is, I think, the lack of docstrings on all these functions. This may be a good time to add at least one-liners for the helper functions.

}
if gather_statistics:
# Read from _metadata file
if dataset.metadata and dataset.metadata.num_row_groups == len(dataset.pieces): # one rg per piece
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wasn't aware that the metadata was exposed like this - is it exactly the same as a _metadata file?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It appears to have all of the information that we need and responds quickly, so I suspect so.

# Read from each individual piece (possibly slow)
else:
row_groups = [
piece.get_metadata(lambda fn: pq.ParquetFile(fs.open(fn, mode='rb'))).row_group(0)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So this is gather_statistics=True, but no metadata? I wonder if a speed warning here might be appropriate if there are many files, or if we find that each has a large number of columns.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. I'm not sure if we actually want to do this. My guess is that we want to do something like gather statistics if there is a metadata file, or maybe if we're using a local file system, but otherwise don't unless the user explicitly tells us to.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would really add a Warning here. From my personal experience when you have a medium sized Parquet dataset on a (cloud) object store like AWS S3 or Azure Blob Storage this will be unusable slow. It is not easy for a first-time user to understand the problem and thus they will get frustrated and abandon dask/parquet.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe could have a timer that warns if the process is taking longer than some limit?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I propose a statistics=None keyword that can be

  • True: get statistics, even if no metadata file is present
  • False: don't get statistics, even if a metadata file is present
  • None: get statistics only if a metadata file is present

In this ways users need to explicitly opt-in to expensive behavior.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we start focusing on Arrow and adopt a policy like this then it would be good to be able to write metadata files with arrow. Reported upstream in Arrow-1983.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

cc @wesm

'max': column.statistics.max,
'null_count': column.statistics.null_count})
s['columns'].append(d)
stats.append(s)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Side note: can we make use of the a-priori knowledge of number of rows somehow for len(df) or map_partitions(len)?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that this would add too much complexity. If you have suggestions on how to do it well then those would be welcome, though probably in another issue.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Coming back to this from before...
I think an implementation would amount to either a subclass of DataFrame which knows how to do len or, probably better, an optimisation specific to parquet when a len is requested - along the lines of what the previous column-selection was doing.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have no objection in principle to tracking more metadata, though I do think that it will be hard to do in general.

Would you mind if we defer this conversation to future work?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Certainly, was just scrolling through the comments and noticed this. I think generally it would be very useful to Dask to pass on more information where possible, to avoid doing extra work.

if isinstance(columns, str):
df = read_parquet(path, [columns], filters, categories, index,
storage_options, engine, infer_divisions)
return df[columns]
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This becomes the way to make a series?
The previous asserts on columns being a list could be dropped I think.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. It seems simpler and centralizes a bunch of logic that was way further down. This change is isolated to the first commit, which has many more deletions than additions.


paths = sorted(paths, key=natural_sort_key) # numeric rather than glob ordering

func, meta, statistics, parts = read(fs, fs_token, paths, columns=columns, filters=filters,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we actually use the fs token? Seems like open_files would be cleaner, if possible, and I see the name, above, doesn't use it.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't know.

divisions = out[0]['divisions']
index = out[0]['name']
elif len(out) > 1:
warnings.warn("Multiple sorted columns found: " + ", ".join(o['name'] for o in out))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i.e., the user should ideally pick one? If they have picked one, then we don't care that multiple possible index columns exist. Conversely, they may have said no index, or the given index column is not sorted, so there being exactly one sorted column shouldn't mean it automatically becomes the index.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, ideally we listen to the index= keyword here. I haven't gotten to this yet.


def read_parquet_part(func, fs, meta, part, columns, index, kwargs):
""" Read a part of a parquet dataset """
df = func(fs, part, columns, **kwargs)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Comment here to point to the two possible values of func, which would otherwise be tricky to divine here.
If the file is again opened within the pyarrow function (it certainly would be for fastparquet), then OpenFiles would be better than passing around the filesystem (@jcrist , agree on this point?).

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What do you mean by the two possible values of func? Which values are you referring to?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One for pyarrow, one for fastparquet

return out


def sorted_columns(statistics):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Some helper functions are hidden, some are not. Perhaps all functions should be normal and documented?

@mrocklin
Copy link
Member Author

A few thought for you. Generally, the changes are fine - but I'm not yet convinced how much simpler this layout is.

We're removing a lot of burden from the backends. Complexity isn't significantly reduced overall, it's just moved to a central location and hopefully standardized.

That may become clear when the fastparquet arm is also done.

Do you have any interest in engaging on this? I could use help.

@martindurant
Copy link
Member

Do you have any interest in engaging on this? I could use help.

Interest, yes, but I don't want to promise time - I'll be planning effort allocation in the next week or so.

@mrocklin
Copy link
Member Author

Do you have any interest in engaging on this? I could use help.

Interest, yes, but I don't want to promise time - I'll be planning effort allocation in the next week or so.

Alternatively, maybe this could be a time to reconsider what we support long term?

My inclination here is to see what I can clean up to make things more cleanly extensible, targetting the pyarrow interface as first customer. I suspect that we'll run into issues with pyarrow which I plan to push on or work on upstream to see if we can get things working smoothly. If that goes well and we can get all of the support we need from arrow then I'd be inclined to sunset our support of fastparquet for maintenance costs reasons. Thoughts on this?

Also, for transparency, I'm working on this because I expect that NVIDIA will have a cudf parquet reader prototype in a few weeks, and am looking into how expensive it would be to hook it up to dask.dataframe.

@martindurant
Copy link
Member

@seibert, @pzwang , you may have an opinion on how much effort fastparquet maintenance is worth.

@mrocklin
Copy link
Member Author

mrocklin commented Apr 1, 2019

So, what happens if I get things nicely working with Arrow, but not yet with Fastparquet? Do we wait on this PR until someone implements the fastparquet side of this rewrite, or do we merge without fastparquet support?

@mrocklin
Copy link
Member Author

In https://github.com/mrocklin/dask/pulls#issuecomment-501911299 @rjzamora says

I have worked through many of the tests for the pyarrow engine, and currently have only 4 tests failing. Here is a grep for FAILED in the pytest output:

100:dask/dataframe/io/tests/test_parquet.py::test_categories[pyarrow] FAILED
102:dask/dataframe/io/tests/test_parquet.py::test_timestamp_index[pyarrow] FAILED
119:dask/dataframe/io/tests/test_parquet.py::test_columns_name[pyarrow] FAILED
160:dask/dataframe/io/tests/test_parquet.py::test_datasets_timeseries FAILED
368:= 4 failed, 78 passed, 65 skipped, 4 xfailed, 1 xpassed, 17 warnings in 7.88 seconds =

A few notes:

  • The test_timestamp_index and test_datasets_timeseries tests will pass if you use something like assert_eq(ddf1.compute(), ddf2.compute()).
  • This line of test_categories is failing, because cats_set has a different ordering than the truth value.
  • test_columns_name is failing because df.columns.name is not making it through the round trip.
  • I am skipping the tests for append=True
  • I got many of the other tests to pass just by specifying the index when reading. Also, in cases where the dataframe's index has no name before the write, it will be given the name 'index' by default (meaning that the default index name wil be 'index' instead of None after it is read in). This PR largely assumes that the index will be preserved as a column in to_parquet, so it is up to the user to specify the correct index to read_parquet (i.e. I am not trying to do anything fancy to make the index preservation invisible to the user).

@mrocklin @martindurant Please let me know if we want/need the behavior of to_parquet and read_parquet to be more consistent before and after the refactor. I am mostly interested to know how appropriate/innappropriate it is to significantly reduce the reader's ability to automatically detect the index.

@wesm
Copy link
Contributor

wesm commented Jun 14, 2019

I think it would be better to be explicit about the index rather than automatically detect. The exception would be if you want to set Dask-specific metadata in the file footer to indicate which field should be used as the index

@martindurant
Copy link
Member

I think it would be better to be explicit about the index rather than automatically detect.

I know @mrocklin has also suggested in the past, but seems a shame to write the Pandas metadata (which took some effort to get right!) and then not use it. Specifying the index is one of the few things that the parquet metadata was not able to do.

I am skipping the tests for append=True

Those are the hard ones! For arrow output without a _metadata, should be equivalent just to using an offset in the data file names.

@rjzamora
Copy link
Member

Those are the hard ones! For arrow output without a _metadata, should be equivalent just to using an offset in the data file names.

@martindurant Sorry - I should specify that I was skipping them for now. However, I will probably get them addressed today.

@martindurant
Copy link
Member

Certainly, but they've caused me enough headaches in the past... Good luck!

@rjzamora
Copy link
Member

I know @mrocklin has also suggested in the past, but seems a shame to write the Pandas metadata (which took some effort to get right!) and then not use it. Specifying the index is one of the few things that the parquet metadata was not able to do.

That is understandable. I will try to work out a clean/general way to work this in.

@wesm
Copy link
Contributor

wesm commented Jun 14, 2019

@martindurant I think I might be using the wrong terminology, the inference logic I was referring to is the divisions:

https://github.com/dask/dask/blob/master/dask/dataframe/io/parquet.py#L1122

@jorisvandenbossche
Copy link
Member

@rjzamora I didn't look at the diff in detail (it's big :-)), but just wanted to check: do you think the changes you have been doing in pyarrow for getting and writing the metadata will prove sufficient for the implementation here in dask?
It's that in Arrow the goal is to close the queue for the next 0.14.0 release next week. So if there are additional changes needed in pyarrow to get this branch working, it would be good to work that out this week.

@rjzamora
Copy link
Member

@jorisvandenbossche Yes - My mpression is that the basic features needed for dask are now available in pyarrow. There are still opportunities for improvement, but the highest-priority items have been addressed.

@jorisvandenbossche
Copy link
Member

That sounds good. Feel free to open JIRA issues on things that can be further improved or made easier with pyarrow while you encounter them!

@mrocklin
Copy link
Member Author

I anticipate that we will run into issues when reading statistics of derived datatypes like text and datetimes. I think that I raised a JIRA about this a while ago that is still open. I doubt that we'll get a fix in before the end of the week though, so I wouldn't hold things up for this.

@jorisvandenbossche
Copy link
Member

You are probably talking about https://issues.apache.org/jira/browse/ARROW-4139 ?

@mrocklin
Copy link
Member Author

mrocklin commented Jun 17, 2019 via email

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

8 participants