Skip to content

[POC] Introduce partition_metadata attribute to DataFrame#9473

Draft
rjzamora wants to merge 34 commits intodask:mainfrom
rjzamora:partition-stats
Draft

[POC] Introduce partition_metadata attribute to DataFrame#9473
rjzamora wants to merge 34 commits intodask:mainfrom
rjzamora:partition-stats

Conversation

@rjzamora
Copy link
Member

@rjzamora rjzamora commented Sep 8, 2022

This is a rough POC intended to illustrate how a new PartitionMetadata and PartitionStatistics classes class can be used to accomplish a few impactful goals in Dask-DataFrame The Goals:

  1. Isolate DataFrame-collection metadata (meta and divisions) in one place (simplifying future expansion and possible movement into HLG/Layer)
  2. Add a mechanism to track column partitioning beyond the conventional divisions. Motivation: Track whether DataFrame partitions are sorted, even if divisions are unknown? #9425
  3. Add a mechanism to track partition-wise statistics (e.g. partition lengths and min/max statistics for each column). Motivation: Add partition lengths to DataFrame metadata. #5633, Use parquet metadata to get length #6387, Use parquet metadata to calculate len #7912

If we ultimately decide that a change like this make sense, I suggest that we break the work into three distinct stages:

  1. Move meta and divisions management into a new/distinct "partition_metadata" attribute using a new PartitionMetadata class (specific attribute and class names are up for debate)
  2. Add partitioned_by functionality to PartitionMetadata (and therefore DataFrame/Series)
  3. Add mechanism to track partition statistics to PartitionMetadata

TODO: Add a clearer breakdown of the proposed design changes, and include some explicit user-code examples.

@rjzamora rjzamora added dataframe enhancement Improve existing functionality or make things work better labels Sep 8, 2022
@github-actions github-actions bot added the io label Sep 8, 2022
Copy link
Collaborator

@ian-r-rose ian-r-rose left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry to take so long to read through this @rjzamora! I'm very much in favor of the general approach you've taken here. I think that the relatively simple (one could say impoverished) set of metadata tracked on dask dataframes prevents a number of possible improvements, and it's due for a redesign.

Most of my comments are in the vein of trying to make an API that is difficult to get wrong, while retaining the richness that we want. I realize this is a POC, so excuse me if my comments are overly detailed, or are towards things that you deliberately decided to defer until later.

# it is also partitioned by ("A", "B", ...)
if _by[: len(group)] == group:
return self.partition_metadata.partitioning[group]
return False
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Rather than return False for a non string, list, or tuple type, I think it would make more sense to just raise a TypeError, and document reasonable types for the input

Comment on lines +321 to +322
- <column-name>: DataFrame of stats for the column
- Required DataFrame columns: "min" and "max"
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why use a dataframe? For something this small, I'd imagine a namedtuple or dataclass might be easier. Though perhaps I'm still using the old way of thinking where we try to avoid using numpy/pandas for graph logic.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I was certainly unsure about this. My only insight from parsing statistics in the parquet world is that it is much faster to do things with the column statistics when they are represented in an array-like format. For example, Parquet effectively gives you a distinct {"col0": {"min": <val>, "max": <val>}} dictionary for each row-groupn, and this makes it extremely slow to calculate divisions or apply filters for many-partition datasets. In order to aggregate multiple row-groups into a dask partition, we typically start by converting to a pandas DataFrame.

I should note that I'm not particularly fond of the current design here just yet. I was certainly aiming for rough changes that someone like you would help me improve :)


def __init__(
self,
statistics: dict | None = None,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This interface smells a bit off to me. Rather than accept a single dictionary with very specific structure, why not just use constructor arguments? It's unfortunate for greenfield API design to already have magic values ("__num_rows__").

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes - The "__num_rows__" magic key needs to go!

What I am trying to do here is come up with a way to both separate and couple the various types of partition statistics at the same time. This is because we want to be able to access both column and num-rows statistics individually, but we also want to be able to use the same callback (maybe delayed?) function to load multiple statistics "lazily" at the same time.

I am still struggling a bit to come up with an elegant solution for this.

):
self._statistics = statistics or {}

def copy(self, keys: set | None = None) -> PartitionStatistics:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Perhaps columns instead of keys? key is already a bit overloaded.

@property
def available_stats(self) -> set:
"""Return all available partition-statistic keys"""
return self.known_stats | self.lazy_stats
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Isn't this the same as set(self._statistics.keys())?

def _divisions(self):
# _divisions Compatability
raise FutureWarning(
"_Frame._divisions is depracated. " "Please use _Frame.divisions"
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This future warning seems wrong to me?

Comment on lines +4748 to +4754
part_sizes = self.partition_metadata.get_stats({"__num_rows__"})[
"__num_rows__"
]
if part_sizes:
return sum(part_sizes)
except KeyError:
pass
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this should just be an optional top-level attribute on the metadata, rather than keying into a dictionary with a magic name.


# Use partition statistics to check if new index is already sorted
if not pre_sorted and divisions is None:
try:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is very exciting to see.

From an API standpoint, I'd like to have a way to determine if a partition is sorted without possibly raising a KeyError in normal usage. That is to say, I think this is a totally reasonable request of the metadata, so it shouldn't raise a KeyError.

meta=no_default,
out=None,
transform_divisions=True,
partition_metadata=None,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Who is responsible for making sure that meta and partition_metadata are in sync? For more internal APIs such as this, I'd probably want to just replace meta with partition_metadata.



def new_dd_object(dsk, name, meta, divisions, parent_meta=None):
def new_dd_object(dsk, name, meta, divisions, parent_meta=None, **metdata_kwargs):
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks like a great opportunity for consolidation of kwargs into a single metadata object (I suspect that's on your mind here, but out-of-scope for this POC):

def new_dd_object(dsk, name, partition_metadata):

@github-actions github-actions bot added the dispatch Related to `Dispatch` extension objects label Oct 26, 2022
rapids-bot bot pushed a commit to rapidsai/cudf that referenced this pull request Oct 27, 2022
Removes unnecessary code from `dask_cudf.core._Frame` that is already handled in the super-class (`dask.dataframe.core._Frame`). By removing the unnecessary `__init__` logic from `dask_cudf`, we can avoid breakages from upstream changes like dask/dask#9473.

Authors:
  - Richard (Rick) Zamora (https://github.com/rjzamora)

Approvers:
  - GALI PREM SAGAR (https://github.com/galipremsagar)

URL: #12001
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

dataframe dispatch Related to `Dispatch` extension objects enhancement Improve existing functionality or make things work better io

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants