[POC] Introduce partition_metadata attribute to DataFrame#9473
[POC] Introduce partition_metadata attribute to DataFrame#9473
Conversation
ian-r-rose
left a comment
There was a problem hiding this comment.
Sorry to take so long to read through this @rjzamora! I'm very much in favor of the general approach you've taken here. I think that the relatively simple (one could say impoverished) set of metadata tracked on dask dataframes prevents a number of possible improvements, and it's due for a redesign.
Most of my comments are in the vein of trying to make an API that is difficult to get wrong, while retaining the richness that we want. I realize this is a POC, so excuse me if my comments are overly detailed, or are towards things that you deliberately decided to defer until later.
dask/dataframe/core.py
Outdated
| # it is also partitioned by ("A", "B", ...) | ||
| if _by[: len(group)] == group: | ||
| return self.partition_metadata.partitioning[group] | ||
| return False |
There was a problem hiding this comment.
Rather than return False for a non string, list, or tuple type, I think it would make more sense to just raise a TypeError, and document reasonable types for the input
dask/dataframe/core.py
Outdated
| - <column-name>: DataFrame of stats for the column | ||
| - Required DataFrame columns: "min" and "max" |
There was a problem hiding this comment.
Why use a dataframe? For something this small, I'd imagine a namedtuple or dataclass might be easier. Though perhaps I'm still using the old way of thinking where we try to avoid using numpy/pandas for graph logic.
There was a problem hiding this comment.
Yeah, I was certainly unsure about this. My only insight from parsing statistics in the parquet world is that it is much faster to do things with the column statistics when they are represented in an array-like format. For example, Parquet effectively gives you a distinct {"col0": {"min": <val>, "max": <val>}} dictionary for each row-groupn, and this makes it extremely slow to calculate divisions or apply filters for many-partition datasets. In order to aggregate multiple row-groups into a dask partition, we typically start by converting to a pandas DataFrame.
I should note that I'm not particularly fond of the current design here just yet. I was certainly aiming for rough changes that someone like you would help me improve :)
dask/dataframe/core.py
Outdated
|
|
||
| def __init__( | ||
| self, | ||
| statistics: dict | None = None, |
There was a problem hiding this comment.
This interface smells a bit off to me. Rather than accept a single dictionary with very specific structure, why not just use constructor arguments? It's unfortunate for greenfield API design to already have magic values ("__num_rows__").
There was a problem hiding this comment.
Yes - The "__num_rows__" magic key needs to go!
What I am trying to do here is come up with a way to both separate and couple the various types of partition statistics at the same time. This is because we want to be able to access both column and num-rows statistics individually, but we also want to be able to use the same callback (maybe delayed?) function to load multiple statistics "lazily" at the same time.
I am still struggling a bit to come up with an elegant solution for this.
dask/dataframe/core.py
Outdated
| ): | ||
| self._statistics = statistics or {} | ||
|
|
||
| def copy(self, keys: set | None = None) -> PartitionStatistics: |
There was a problem hiding this comment.
Perhaps columns instead of keys? key is already a bit overloaded.
dask/dataframe/core.py
Outdated
| @property | ||
| def available_stats(self) -> set: | ||
| """Return all available partition-statistic keys""" | ||
| return self.known_stats | self.lazy_stats |
There was a problem hiding this comment.
Isn't this the same as set(self._statistics.keys())?
dask/dataframe/core.py
Outdated
| def _divisions(self): | ||
| # _divisions Compatability | ||
| raise FutureWarning( | ||
| "_Frame._divisions is depracated. " "Please use _Frame.divisions" |
There was a problem hiding this comment.
This future warning seems wrong to me?
dask/dataframe/core.py
Outdated
| part_sizes = self.partition_metadata.get_stats({"__num_rows__"})[ | ||
| "__num_rows__" | ||
| ] | ||
| if part_sizes: | ||
| return sum(part_sizes) | ||
| except KeyError: | ||
| pass |
There was a problem hiding this comment.
I think this should just be an optional top-level attribute on the metadata, rather than keying into a dictionary with a magic name.
dask/dataframe/core.py
Outdated
|
|
||
| # Use partition statistics to check if new index is already sorted | ||
| if not pre_sorted and divisions is None: | ||
| try: |
There was a problem hiding this comment.
This is very exciting to see.
From an API standpoint, I'd like to have a way to determine if a partition is sorted without possibly raising a KeyError in normal usage. That is to say, I think this is a totally reasonable request of the metadata, so it shouldn't raise a KeyError.
dask/dataframe/core.py
Outdated
| meta=no_default, | ||
| out=None, | ||
| transform_divisions=True, | ||
| partition_metadata=None, |
There was a problem hiding this comment.
Who is responsible for making sure that meta and partition_metadata are in sync? For more internal APIs such as this, I'd probably want to just replace meta with partition_metadata.
dask/dataframe/core.py
Outdated
|
|
||
|
|
||
| def new_dd_object(dsk, name, meta, divisions, parent_meta=None): | ||
| def new_dd_object(dsk, name, meta, divisions, parent_meta=None, **metdata_kwargs): |
There was a problem hiding this comment.
This looks like a great opportunity for consolidation of kwargs into a single metadata object (I suspect that's on your mind here, but out-of-scope for this POC):
def new_dd_object(dsk, name, partition_metadata):Removes unnecessary code from `dask_cudf.core._Frame` that is already handled in the super-class (`dask.dataframe.core._Frame`). By removing the unnecessary `__init__` logic from `dask_cudf`, we can avoid breakages from upstream changes like dask/dask#9473. Authors: - Richard (Rick) Zamora (https://github.com/rjzamora) Approvers: - GALI PREM SAGAR (https://github.com/galipremsagar) URL: #12001
This is a rough POC intended to illustrate how a new
PartitionMetadataandclass can be used to accomplish a few impactful goals in Dask-DataFrame The Goals:PartitionStatisticsclassesmetaanddivisions) in one place (simplifying future expansion and possible movement into HLG/Layer)divisions. Motivation: Track whether DataFrame partitions are sorted, even if divisions are unknown? #9425len#7912If we ultimately decide that a change like this make sense, I suggest that we break the work into three distinct stages:
metaanddivisionsmanagement into a new/distinct "partition_metadata" attribute using a newPartitionMetadataclass (specific attribute and class names are up for debate)partitioned_byfunctionality toPartitionMetadata(and thereforeDataFrame/Series)PartitionMetadataTODO: Add a clearer breakdown of the proposed design changes, and include some explicit user-code examples.