Add base CollectionOperation, FrameOperation and CompatFrameOperation classes#9119
Conversation
| def __init__(self, dsk, name, meta, divisions=None): | ||
| def __init__(self, dsk=None, name=None, meta=None, divisions=None, operation=None): |
There was a problem hiding this comment.
Not sure of the best way to support "old" and "new" mechanisms for initializing collections. Other ideas are welcome.
The vision here is to make the entire state of a collection depend on the operation. However, we are a pretty long way from being able to require collections to be intialized this way. Also, when we do remove the legacy arguments, it may make sense to support the data= behavior of pandas (and automatically create a creation operation under the hood).
dask/dataframe/core.py
Outdated
| @_meta.setter | ||
| def _meta(self, value): | ||
| self._operation = self.operation.replay(meta=value) |
There was a problem hiding this comment.
Note that this will not change the name of the new operation (and therefore the collection). However, the FrameOperation will be hashed using name/meta/divisions, so it is still important that we "replay" operations when a property is changed.
Note that I originally wanted to require the operations name to uniquely depend on its perperties, but this doesn't work for CompatFrameOperation (where we cannot change the name used in the existing HLG).
| @property | ||
| def _divisions(self): | ||
| warnings.warn( | ||
| "_divisions property is deprecated. Use divisions instead.", | ||
| FutureWarning, | ||
| ) | ||
| return self.divisions | ||
|
|
||
| @_divisions.setter | ||
| def _divisions(self, value): | ||
| warnings.warn( | ||
| "Settable _divisions property is deprecated. " | ||
| "Set divisions attribute directly instead.", | ||
| FutureWarning, | ||
| ) | ||
| self.divisions = value |
There was a problem hiding this comment.
Do we want to define divisions and _divisions at the collection level? Im choosing to deprecate _divisions to simplify things a bit. Though, it does feel strange to encourage users to set .divisions directly...
There was a problem hiding this comment.
Generally I'd vote for just going with divisions. Do we have a read on how much usage the divisions setter gets? I could be totally wrong here but should people typically be using repartition instead of directly setting divisions?
| dsk=dsk, name=name, meta=meta, divisions=divisions, operation=operation | ||
| ) | ||
|
|
||
| # TODO: Get "collection_annotations" info from operation |
There was a problem hiding this comment.
I suppose we should start planning early for the CollectionOperation version of annotations/collection_annotations
| if key == "_operation": | ||
| object.__setattr__(self, key, value) | ||
|
|
||
| try: | ||
| columns = object.__getattribute__(self, "_meta").columns | ||
| except AttributeError: | ||
| columns = () |
There was a problem hiding this comment.
Note that this breaks dask_cudf (since its DataFrame initializer doesn't use the operator= apporoach internally).
douglasdavis
left a comment
There was a problem hiding this comment.
Thanks a lot for laying out your plan, @rjzamora! It definitely makes things a bit easier to follow. Chiming in with some small typing comments here now, and still doing a bit more digesting
dask/operation.py
Outdated
| """ | ||
| raise NotImplementedError | ||
|
|
||
| def reinitialize(self, new_dependencies: dict, **new_kwargs) -> CollectionOperation: |
There was a problem hiding this comment.
Since CollectionOperation inheriting classes are going to implement this method the return type should be a bound TypeVar (eventually a Self); perhaps something like
CollOpT = TypeVar("CollOpT", bound="CollectionOperation")
dask/operation.py
Outdated
| """Hash a CollectionOperation""" | ||
| raise NotImplementedError | ||
|
|
||
| def replay(self, **kwargs) -> CollectionOperation: |
There was a problem hiding this comment.
I think this may also need to be CollOpT-like return typehinted
dask/operation.py
Outdated
| """Get the collection keys for this operation""" | ||
| raise NotImplementedError | ||
|
|
||
| def copy(self) -> CollectionOperation: |
There was a problem hiding this comment.
I think this may also need to be CollOpT-like return typehinted
|
Thanks for keeping after this @rjzamora! My understanding of your introduction above is that this is more-or-less the "API" proposal for this effort. So, in that spirit, I'm doing a bit of a brain dump here after noodling over the API for a couple of days. My main concerns over the API over in #9076 could probably be summarized by the following: Since the API is designed around "replaying" chains of collection operations, we should make that process as easy as possible to write, and as hard to get wrong as possible. My main two concerns with that PR were both reflections of that desire, namely:
An aside: a lot of my thinking here is influenced by React. That is mostly for building user interfaces, but there are a lot of features that we have in common, namely generalized "components" in a tree structure, a strong preference for declarative, immutable design, and tree traversal algorithms that allow it to only update UI elements that need it. But what I think is most interesting about React is that they've put together an API for this that everyone mostly likes to work with. So I'm still thinking this through, and I wanted to explore a tweak to your design here: what if the
But I think that actually using a
I took a (pseudocode) attempt at writing out some of your from __future__ import annotations
from dataclasses import dataclass, InitVar
from typing import Any, Hashable, Generic, TypeVar, Tuple
from typing_extensions import TypeAlias
from dask.base import tokenize
KeyType = TypeVar("KeyType", bound=Hashable)
@dataclass(frozen=True)
class CollectionOperation(Generic[KeyType]):
"""CollectionOperation class
Encapsulates the state and graph-creation logic for
a generic Dask collection.
"""
name: str
dependencies: field(frozenset[CollectionOperation[Any]], init=False)
replace_dependencies: InitVar[dict[str, CollectionOperation[Any]]]
def __post_init__(self, new_dependencies):
# Escape hatch to perform any post-initialization logic,
# could be used to patch up the dependencies with new_dependencies
pass
def subgraph(
self, keys: list[tuple[KeyType]]
) -> tuple[dict[KeyType, Any], dict[CollectionOperation[Any], list[tuple[Hashable]]]]:
"""Return the subgraph and key dependencies for this operation
NOTE: Since this method returns a mapping between dependencies
and required keys, the logic may include recursively fusion.
Parameters
----------
keys : list[tuple]
List of required output keys needed from this collection
Returns
-------
graph : dict
The subgraph for the current operation
dependencies : dict[CollectionOperation, list[tuple]]
A dictionary mapping ``CollectionOperation`` objects
to required keys. This dictionary will be used by the
global graph-generation algorithm to determine which
operation-key combinations need to be materialized after
this operation.
"""
raise NotImplementedError
@property
def collection_keys(self) -> list[KeyType]:
"""Get the collection keys for this operation"""
raise NotImplementedError
PartitionKey: TypeAlias = Tuple[str, int]
@dataclass(frozen=True)
class FrameOperation(CollectionOperation[PartitionKey]):
"""Abtract DataFrame-based CollectionOperation"""
meta: Any
divisions: tuple[Any]
@property
def npartitions(self) -> int:
"""Return partition count"""
return len(self.divisions) - 1
@property
def collection_keys(self) -> list[PartitionKey]:
"""Return list of all collection keys"""
return [(self.name, i) for i in range(self.npartitions)]
def __hash__(self):
"""Hash a FrameOperation using its name, meta and divisions"""
return hash(tokenize(self.name, self.meta, self.divisions))
Note that it doesn't really look too different. The main change is that I've added an |
|
Thanks for the quick feedback @douglasdavis and @ian-r-rose! @ian-r-rose - I am new to dataclasses, but that does seem like a clean way to capture much of what we are trying to do here (especially the immutability). I'll experiment with your idea. Are there any limitations of dataclasses that you think could become a problem? Will we be able to handle property caching and multiple inheritance? (sorry - I haven't gotten a chance to read up on this yet) |
Well, with As for multiple inheritance, I believe that works without any caveats. I haven't 100% convinced myself that this is a great idea, but sort of came around to it after failing to come up with a protocol that I liked. |
|
Another downside of dataclasses is that they have the reputation of being slow. @jcrist might yell at me for suggesting them :) |
|
Thanks again for the feedback here @ian-r-rose It does seem like the dataclass approach could be promising, but I definitely don't have the Python knowledge to make a decisive call in that direction. Perhaps it would be useful to start by establishing what a "good" dataclass-free approach might look like (so that we can make a better comparison). Clearly the current state of this PR does not ensure true "immutably". For example, perhaps it makes sense to simply override class CollectionOperation:
__frozen_slots__: Tuple[str] = ("dependencies",)
__mutable_slots__: Tuple[str] = tuple()
def __setattr__(self, name, value):
# We override __setattr__ to ensure a Collection
# object is "immutable" in the sense that the
# attributes in `__frozen_slots__` may only be initialized
if name in self.__frozen_slots__:
if hasattr(self, name):
# Immutable-attribute error
msg = f"Attribute '{name}' of '{self.__class__}' is immutable."
raise AttributeError(msg)
else:
# Allow initialization
object.__setattr__(self, name, value)
elif name in self.__mutable_slots__:
# Allow arbitrary mutation
object.__setattr__(self, name, value)
else:
# Missing-attribute error
msg = f"'{self.__class__}' has no '{name}' attribute."
raise AttributeError(msg)I do get the sense that we would ultimately avoid adding attributes to |
Overriding Edit: What sorts of things might you consider putting in the |
dask/dataframe/operation.py
Outdated
| if new_dependencies: | ||
| raise ValueError("CompatFrameOperation cannot have dependencies") |
There was a problem hiding this comment.
Is this a TODO? Rewriting dependencies in an operation stack with a CompatFrameOperation in the middle sounds like an important part of this to me.
There was a problem hiding this comment.
I'm still going back and forth on this. I got the impression that the "easiest" approach is to go "all or nothing" on CompatFrameOperation in the sense that an unsupported API call would simply convert the entire graph to a single CompatFrameOperation. There is obviously good motivation not to do this if we don't need to. However, the subgraph method requires the operation to materialize its own graph and specify the specific keys required by its dependencies. I had trouble thinking of a simple way to ""automatically" support this.
|
I should also add @rjzamora that since this is against a long-lived feature branch, I don't want to stand in the way of merging proposals to the design early. |
I imagine this being for caching (a fallback mechanism for avoiding repeated work if there is no other way around it). |
I don't think we should let any PR sit for too long in this feature branch, but I do think this initial design is important enough to discuss it for a few days (if needed). We can always make changes to the base CollectionOperation design later on, but it is certainly easiest to change things now :) |
|
Thanks for experimenting with the
I'm not certain whether these are fatal to the idea, how are you finding the experience? |
dask/dataframe/operation.py
Outdated
| def replace_divisions(self, value) -> CollectionOperation[tuple[str, int]]: | ||
| """Return a new operation with different divisions""" | ||
| raise ValueError(f"divisions cannot be modified for {type(self)}") |
There was a problem hiding this comment.
Why define this at all instead of going through reinitialize?
There was a problem hiding this comment.
I was thinking that we may not want to require divisions and meta to be data fields on all operation objects.
|
|
||
| from dask.base import tokenize | ||
|
|
||
| KeyType = TypeVar("KeyType", bound=Hashable) |
There was a problem hiding this comment.
This KeyType stuff is orthogonal to the actual class representation, bit I liked it.
|
I implemented some more-complex fusion logic, and it seems like dataclasses provide what we need as long as optional arguments are avoided on anything other than "leaf" classes. Note that I was a bit overwhelmed by the type-annotation warnings when I tried to use |
| @property | ||
| def meta(self) -> Any: | ||
| return self._meta | ||
|
|
||
| def replace_meta(self, value) -> CompatFrameOperation: | ||
| return replace(self, _meta=value) | ||
|
|
||
| @property | ||
| def divisions(self) -> tuple | None: | ||
| return tuple(self._divisions) if self._divisions is not None else None | ||
|
|
||
| def replace_divisions(self, value) -> CompatFrameOperation: | ||
| return replace(self, _divisions=value) |
There was a problem hiding this comment.
I'm not convinced that this is any easier to use than just making meta and divisions proper fields on the FrameOperation class:
@dataclass(frozen=True)
class FrameOperation(CollectionOperation):
"""Abtract DataFrame-based CollectionOperation"""
meta: Any
divisions: tuple[Any]
op = FrameOperation(meta=df, divisions=(None,)*npartitions)
op.reinitialize(meta=new_meta)It forces the positioning of the args in the __init__, but perhaps it's not too bad for these cases?
There was a problem hiding this comment.
My interest in avoiding "field" status for meta and divisions is entirely because I imagine that we will eventually want to allow the Operation itself to figure out what the meta and divisions should be. If we add meta and divisions as fields, we are establishing that we typicall expect meta and divisions to be known before a FieldOperation is initialized. With that said, this assumptions will certainly be true at first, and I could be wrong about the benefits of breaking away from this behavior.
There was a problem hiding this comment.
That makes sense to me. I still like the idea of having a single entrypoint for "replaying", but there's time and room to experiment here.
…ping Add some type annotations.
…ial-collection-operation
…ial-collection-operation
|
Off to the races! |
|
@ian-r-rose - Just a note that my next PR will likely revise a lot of what was merged here :) While thinking through the migration process, I realized that it would be a complete headache to add |
Addresses "Step 2" of "DataFrame Development" plan described in #9117
This PR does not focus on graph materialization/optimization, nor does it define any specialized
CollectionOperationlogic for DataFrame collections. However, this PR does define the basic API that will be used for these purposes moving forward (so a reasonable consensus on these changes is very desireable).