Skip to content

Use parquet metadata to calculate len#7912

Closed
rjzamora wants to merge 8 commits intodask:mainfrom
rjzamora:metadata-len
Closed

Use parquet metadata to calculate len#7912
rjzamora wants to merge 8 commits intodask:mainfrom
rjzamora:metadata-len

Conversation

@rjzamora
Copy link
Member

Dask does not currently save/leverage the parquet-metadata statistics tp calculate the length of a DataFrame collection. This PR modifies read_parquet to save the size of each partition (when this information is available and correct), and then uses it in DataFrame.__len__.

I am confident that we definitely want to do something like this. However, I will mark this PR as a draft until we can agree on "where" the partition-size metadata should be stored. For now, I am adding it to collection_annotations, but it may also make sense to make it a DataFrameIOLayer attribute.

Copy link
Member

@jsignell jsignell left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like the idea of putting it in collection_annotations

@rjzamora
Copy link
Member Author

I like the idea of putting it in collection_annotations

Nice. One of my primary concerns is that these annotations are printed in the Layer html repr, and printing every partition length can become pretty verbose. Of course, that answer to that problem may just be to provide a mechanism to specify annotations that shouldn't be written.

@mrocklin
Copy link
Member

mrocklin commented Jul 19, 2021 via email

@rjzamora
Copy link
Member Author

rjzamora commented Jul 19, 2021

Are we planning to track this information through other operations, or is
it only useful for len(dd.read_parquet(...)).

I'm still uncertain if length information is worth tracking through other operations. However, I can say for sure that the len(dd.read_parquet(...)) case is quite important for NVTabular, where we currently need to process the parquet metadata separately (and redundantly) to get the dataset size without calling len(ddf). The motivation comes from the fact that many DL dataloading APIs (for out-of-core data) require the user to specify the total length of the dataset. I am pretty annoyed by this requirement, but it's something we need to provide for now.

@martindurant
Copy link
Member

  • I would certainly recommend tracking the length information! Some operations like column-select/assign or map are guaranteed to preserve the row-count. The total count would be nice to include in the graphical or text output (since it's free).
  • I think you are using parquet statistics only here I think? The row count per row group and globally (if there is _metadata) does not require column statistics. Statistics are optional, but row counts are mandatory. If you don't have _metadata, then of course you need to open the constituent files to get their counts (which is still faster than loading ; and this might be loaded and cached upon len).

I have not yet found the discussion where we go over the idea of storing dataframe-level attributes/metadata (which predated high-level graphs).

@rjzamora
Copy link
Member Author

I think you are using parquet statistics only here I think? The row count per row group and globally (if there is _metadata) does not require column statistics. Statistics are optional, but row counts are mandatory.

This is a good point. The current implementation is using a statistics structure to collect the partition-wise row-counts, but the the row-count is not actually a real "statistic". If there are no filters or index columns to deal with, the statistics list will not actually include any column-chunk information ("statistics") at all. With that said, we avoid parsing/organizing any of this metadata-based information when the user explicitly defines gather_statistics=False.

@martindurant
Copy link
Member

we avoid parsing/organizing any of this metadata-based information when the user explicitly defines gather_statistics=False.

Understood - two slightly different things (parsing thrift footers versus constructing the statistics structure)

@jsignell
Copy link
Member

I just came across a version of this issue that has a really nice comment from Tom. Might be worth considering some of these questions:

FWIW, I think that this is exactly the case we'd first introduce length-aware DataFrame partitions. But the details are a tricky. If you're interested in pursing this, I think we would need

  1. A proposal on how this will be stored on DataFrame / Series / Index (dask.array.Array stores ._chunks as a tuple
  2. A way to pass through this information when creating these objects
  3. A proposal on how methods can use this information (e.g. __len__ can be an example). This will likely need to describe how things fallback to the common unkown-length case (is that automatic? Can that be configured to error?).
  4. A proposal for how methods can indicate that they're shape-preserving. e.g. DataFrame.rename(columns=...) will not affect the rows, so the lengths are preserved. But DataFrame[mask] would have to invalidate the rows.

So not insurmountable, but a good amount of work.

ref: #5633 (comment)

@jsignell jsignell linked an issue Jul 21, 2021 that may be closed by this pull request
@rjzamora
Copy link
Member Author

rjzamora commented Jul 21, 2021

Thanks for digging this up @jsignell ! I totally agree with @TomAugspurger

  1. A proposal on how this will be stored on DataFrame / Series / Index (dask.array.Array stores ._chunks as a tuple

This is exactly what I am hoping to decide on here. My original solution was to store the information at the Layer level in the collection_annotations attribute. I also suggested the possibility of attaching this information to the Layer object as a dedicated _partition_lens attribute. However, I am starting to believe that this particular type of information deserves to live as an optional _Frame attribute as _Frame._lens.

  1. A way to pass through this information when creating these objects

If we store the partition lengths in a _lens attribute, we need to update the _Frame, DataFrame, and Series constructors to accept this optional attribute. We would also need to add it to new_dd_object (since it is typically the “canonical” API for constructing a new _Frame collection in dask.dataframe).

Setting the length information this way becomes trivial in IO functions like read_parquet and from_pandas (you just pass the list into new_dd_object when it is available). For simple length-preserving operations, like assign, we can also modify elemwise to propagate these lengths (if/when they are known).

  1. A proposal on how methods can use this information (e.g. len can be an example). This will likely need to describe how things fallback to the common unkown-length case (is that automatic? Can that be configured to error?).

Methods in the dask.dataframe API should be free to use _lens when the attribute is not set to None. For now, there are not many cases where we need this information (just __len__ for now), and there are many cases where we do not want to require a dask collection to keep track of the length of every partition (shuffling, filtering, merging, etc…). So, we do not want to raise an error when the information is lost.

Note that there are probably other methods, besides __len__, that could make use of _lens. I could also imagine that many users would find a (fast) drop_empty_partitions method to be very useful.

  1. A proposal for how methods can indicate that they're shape-preserving. e.g. DataFrame.rename(columns=...) will not affect the rows, so the lengths are preserved. But DataFrame[mask] would have to invalidate the rows.

I think the primary concern is that we can propagate known lengths through elemwise methods, and some map_partitions operations. Therefore, I suspect that we will only need to add something like a preserve_partition_lens kwarg to map_partitions (which will indicate that _lens should be passed into the final new_dd_object call).

meta=no_default,
enforce_metadata=True,
transform_divisions=True,
length_preserving=False,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nitpick, but the other kwargs are {verb}_{noun} how would you feel about preserve_length

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Makes sense. I have no idea what to call this - So, your opinion is useful, and not just a nitpick :)

)
self._meta = meta
self.divisions = tuple(divisions)
self._lens = lens
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should there be any validation? Like does it need to be the same len as divisions - 1?

Copy link
Member

@mrocklin mrocklin left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for experimenting with this @rjzamora . I'm currently -1 on this approach. I think that it elevates a relatively small optimization too much too high a level in the abstraction. For example, someone could easy "well, I want to compute max" and then we have to add maxes everywhere. Same with uniqueness, emptiness, min-ness, etc.. I think that we can still solve what you want to solve, but that we should find another way.

def __init__(self, dsk, name, meta, divisions=None):
# divisions is ignored, only present to be compatible with other
# objects.
def __init__(self, dsk, name, meta, divisions=None, lens=None):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we're going to add this, let's use the full word, "lengths", which I think will be clearer.

"""

def __init__(self, dsk, name, meta, divisions):
def __init__(self, dsk, name, meta, divisions, lens=None):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am currently -1 to putting this in the DataFrame constructor. I think that this is too niche of a topic I think to be on the same level as meta/divisions. I think that we can find another way.

@mrocklin
Copy link
Member

In general I think that you should expect any change to the core metadata of dataframes on top of meta/divisions/graph will be met with extreme levels of scrutiny :)

@rjzamora
Copy link
Member Author

Thanks for taking a look @mrocklin ! I am not happy with this "proposal" yet, so your comments are helpful.

in general I think that you should expect any change to the core metadata of dataframes on top of meta/divisions/graph will be met with extreme levels of scrutiny :)

No worries. I absolutely expected a -1 from you :)

With that said, I am fairly certain that we do need to come up with a way to store/track this kind of information. So, I am doing my best to experiment with different solutions in an open-minded way.

Note that I locally implemented a Layer-centered solution that works fine, but I switched to the _Frame attribute when I realized this feature is very similar to Array._chunks, and would be much more useful at the dask.dataframe API level than at the HLG Layer level. I also realized that, after I finish the required groundwork to ensure that all Dataframe-specific Layers were based on DataFrameLayer, we would probably be uncomfortable with partition-length information being tracked there as well. Overall, the partition-length information does not feel like it should be attached to the graph to me.

To be clear, I am very hesitant to change the _Frame/DataFrame/Series APIs. I think it is a bit obvious that collection-specific information needs to be attached to the "collection" object itself in some way, but I don't want it to live at the same level as meta/divisions/graph.

For example, someone could easy "well, I want to compute max" and then we have to add maxes everywhere. Same with uniqueness, emptiness, min-ness, etc.. I think that we can still solve what you want to solve, but that we should find another way.

I also had the same "slippery-slope" thought that users may want to track other "collection statistics" to optimize similar operations.

Perhaps a reasonable compromise here is to (1) change _lens into a more-general _partition_statistics attribute, and to (2) treat this attribute as a “second-class citizen”. That is, we could add explicit set/get methods to attach/access these optional statistics, and leave the optional attribute/information out of _Frame/DataFrame/Series initialization. This way, we would avoid any “public” API changes, but provide a formal mechanism/location for this type of information to be stored/propagated. I realize that this idea is likely to get a -1 as well, but maybe I’m getting a bit closer?

@jsignell
Copy link
Member

Just as a note: Array has been doing a bit of this kind of pattern with cached_property. I personally would prefer to have _len as a cached_property of dataframe objects rather than having a grab bag of _partition_statistics.

@rjzamora
Copy link
Member Author

@jsignell - I decided to experiment with your cached_property idea. Please feel welcome to advise :)

@pyrito
Copy link

pyrito commented Dec 17, 2021

Is this PR still active? Are there plans to get this merged in anytime soon? @rjzamora

@jsignell
Copy link
Member

I suspect that this got superseded by the discussions around a high-level expression system for encapsulating all the dataframe metadata #7933

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Use parquet metadata to get length

5 participants