Skip to content

Add base CollectionOperation, FrameOperation and CompatFrameOperation classes#9119

Merged
rjzamora merged 17 commits intodask:collection-refactorfrom
rjzamora:initial-collection-operation
May 28, 2022
Merged

Add base CollectionOperation, FrameOperation and CompatFrameOperation classes#9119
rjzamora merged 17 commits intodask:collection-refactorfrom
rjzamora:initial-collection-operation

Conversation

@rjzamora
Copy link
Copy Markdown
Member

Addresses "Step 2" of "DataFrame Development" plan described in #9117

This PR does not focus on graph materialization/optimization, nor does it define any specialized CollectionOperation logic for DataFrame collections. However, this PR does define the basic API that will be used for these purposes moving forward (so a reasonable consensus on these changes is very desireable).

Comment on lines -135 to +136
def __init__(self, dsk, name, meta, divisions=None):
def __init__(self, dsk=None, name=None, meta=None, divisions=None, operation=None):
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure of the best way to support "old" and "new" mechanisms for initializing collections. Other ideas are welcome.

The vision here is to make the entire state of a collection depend on the operation. However, we are a pretty long way from being able to require collections to be intialized this way. Also, when we do remove the legacy arguments, it may make sense to support the data= behavior of pandas (and automatically create a creation operation under the hood).

Comment on lines +411 to +413
@_meta.setter
def _meta(self, value):
self._operation = self.operation.replay(meta=value)
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note that this will not change the name of the new operation (and therefore the collection). However, the FrameOperation will be hashed using name/meta/divisions, so it is still important that we "replay" operations when a property is changed.

Note that I originally wanted to require the operations name to uniquely depend on its perperties, but this doesn't work for CompatFrameOperation (where we cannot change the name used in the existing HLG).

Comment on lines +448 to +463
@property
def _divisions(self):
warnings.warn(
"_divisions property is deprecated. Use divisions instead.",
FutureWarning,
)
return self.divisions

@_divisions.setter
def _divisions(self, value):
warnings.warn(
"Settable _divisions property is deprecated. "
"Set divisions attribute directly instead.",
FutureWarning,
)
self.divisions = value
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we want to define divisions and _divisions at the collection level? Im choosing to deprecate _divisions to simplify things a bit. Though, it does feel strange to encourage users to set .divisions directly...

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Generally I'd vote for just going with divisions. Do we have a read on how much usage the divisions setter gets? I could be totally wrong here but should people typically be using repartition instead of directly setting divisions?

dsk=dsk, name=name, meta=meta, divisions=divisions, operation=operation
)

# TODO: Get "collection_annotations" info from operation
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I suppose we should start planning early for the CollectionOperation version of annotations/collection_annotations

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sigh...

Comment on lines +4546 to 4552
if key == "_operation":
object.__setattr__(self, key, value)

try:
columns = object.__getattribute__(self, "_meta").columns
except AttributeError:
columns = ()
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note that this breaks dask_cudf (since its DataFrame initializer doesn't use the operator= apporoach internally).

Copy link
Copy Markdown
Member

@douglasdavis douglasdavis left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks a lot for laying out your plan, @rjzamora! It definitely makes things a bit easier to follow. Chiming in with some small typing comments here now, and still doing a bit more digesting

"""
raise NotImplementedError

def reinitialize(self, new_dependencies: dict, **new_kwargs) -> CollectionOperation:
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since CollectionOperation inheriting classes are going to implement this method the return type should be a bound TypeVar (eventually a Self); perhaps something like

CollOpT = TypeVar("CollOpT", bound="CollectionOperation")

"""Hash a CollectionOperation"""
raise NotImplementedError

def replay(self, **kwargs) -> CollectionOperation:
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this may also need to be CollOpT-like return typehinted

"""Get the collection keys for this operation"""
raise NotImplementedError

def copy(self) -> CollectionOperation:
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this may also need to be CollOpT-like return typehinted

@ian-r-rose
Copy link
Copy Markdown
Collaborator

ian-r-rose commented May 23, 2022

Thanks for keeping after this @rjzamora! My understanding of your introduction above is that this is more-or-less the "API" proposal for this effort. So, in that spirit, I'm doing a bit of a brain dump here after noodling over the API for a couple of days. My main concerns over the API over in #9076 could probably be summarized by the following:

Since the API is designed around "replaying" chains of collection operations, we should make that process as easy as possible to write, and as hard to get wrong as possible. My main two concerns with that PR were both reflections of that desire, namely:

  1. We should avoid @setters, instead forcing people to go through the replay process to change attributes of an operation. This basically seems to be done in this PR. Woo!
  2. The replay operation should be hard to get wrong. There will be a lot of Operations that we'll need to make, and that is a lot of opportunities to mess up forwarding some keyword argument to where it needed to be. In particular, I didn't really like the **new_kwargs being passed to type(self). That felt like a fragile object-oriented tower, and difficult to reason about as you yo-yo up and down the MRO. I spent some time trying to figure out if we could write down a Protocol that forces __init__() and regenerate()/reinitialize() to have (mostly) the same signature, and it was very tricky to get right in the presence of subclassing. Now, one might (rightly) argue that protocols aren't really expressive enough for this, but I do think it speaks to some of the underlying slipperiness of the proposed API. An alternative would be to force both generation and regeneration to take more of the same code path (more later).

An aside: a lot of my thinking here is influenced by React. That is mostly for building user interfaces, but there are a lot of features that we have in common, namely generalized "components" in a tree structure, a strong preference for declarative, immutable design, and tree traversal algorithms that allow it to only update UI elements that need it. But what I think is most interesting about React is that they've put together an API for this that everyone mostly likes to work with.

So I'm still thinking this through, and I wanted to explore a tweak to your design here: what if the CollectionOperations used dataclasses? I think that there are some interesting common features/intentions between what you have done here and dataclasses. These include:

  • Easy to enforce immutability (frozen=True)
  • Built-in support for generating __hash__ from an object's attributes. N.b., I still expect we'd want override it with our own tokenization, so I bring this up mostly to point out the intent.

But I think that actually using a dataclass could bring some additional benefits:

  • More code generation for __init__(), possibly also for __eq__(). This would mean that the "bag-of-kwargs" would no longer be allowed, instead every operation would only accept keyword arguments that it knows about.
  • Possibly using dataclasses.replace() instead of reinitialize()
  • Better support from static type checkers :)
  • Stronger guarantees on immutability and hashability

I took a (pseudocode) attempt at writing out some of your Operations here using the above approach:

from __future__ import annotations

from dataclasses import dataclass, InitVar
from typing import Any, Hashable, Generic, TypeVar, Tuple
from typing_extensions import TypeAlias

from dask.base import tokenize


KeyType = TypeVar("KeyType", bound=Hashable)

@dataclass(frozen=True)
class CollectionOperation(Generic[KeyType]):
    """CollectionOperation class

    Encapsulates the state and graph-creation logic for
    a generic Dask collection.
    """
    name: str
    dependencies: field(frozenset[CollectionOperation[Any]], init=False)
    replace_dependencies: InitVar[dict[str, CollectionOperation[Any]]]

    def __post_init__(self, new_dependencies):
        # Escape hatch to perform any post-initialization logic,
        # could be used to patch up the dependencies with new_dependencies
        pass

    def subgraph(
        self, keys: list[tuple[KeyType]]
    ) -> tuple[dict[KeyType, Any], dict[CollectionOperation[Any], list[tuple[Hashable]]]]:
        """Return the subgraph and key dependencies for this operation

        NOTE: Since this method returns a mapping between dependencies
        and required keys, the logic may include recursively fusion.

        Parameters
        ----------
        keys : list[tuple]
            List of required output keys needed from this collection

        Returns
        -------
        graph : dict
            The subgraph for the current operation
        dependencies : dict[CollectionOperation, list[tuple]]
            A dictionary mapping ``CollectionOperation`` objects
            to required keys. This dictionary will be used by the
            global graph-generation algorithm to determine which
            operation-key combinations need to be materialized after
            this operation.
        """
        raise NotImplementedError

    @property
    def collection_keys(self) -> list[KeyType]:
        """Get the collection keys for this operation"""
        raise NotImplementedError

PartitionKey: TypeAlias = Tuple[str, int] 

@dataclass(frozen=True)
class FrameOperation(CollectionOperation[PartitionKey]):
    """Abtract DataFrame-based CollectionOperation"""
    meta: Any
    divisions: tuple[Any]

    @property
    def npartitions(self) -> int:
        """Return partition count"""
        return len(self.divisions) - 1

    @property
    def collection_keys(self) -> list[PartitionKey]:
        """Return list of all collection keys"""
        return [(self.name, i) for i in range(self.npartitions)]

    def __hash__(self):
        """Hash a FrameOperation using its name, meta and divisions"""
        return hash(tokenize(self.name, self.meta, self.divisions))

Note that it doesn't really look too different. The main change is that I've added an InitVar for new_dependencies, meaning that it is only passed into __init__()/__post_init__(), and doesn't exist as a field on the class. The rest of the differences would (I think) mostly happen around validation, type checking, and general developer experience. Ultimately dataclasses are not very complicated, so it wouldn't be too much work to accomplish all of the above ourselves (or retrofitting things later if PEP 681 is accepted). But I like the idea of operating under some constraints/guiderails, and think something like this could help provide it.

@rjzamora
Copy link
Copy Markdown
Member Author

Thanks for the quick feedback @douglasdavis and @ian-r-rose!

@ian-r-rose - I am new to dataclasses, but that does seem like a clean way to capture much of what we are trying to do here (especially the immutability). I'll experiment with your idea. Are there any limitations of dataclasses that you think could become a problem? Will we be able to handle property caching and multiple inheritance? (sorry - I haven't gotten a chance to read up on this yet)

@ian-r-rose
Copy link
Copy Markdown
Collaborator

Are there any limitations of dataclasses that you think could become a problem? Will we be able to handle property caching and multiple inheritance?

Well, with frozen=True it makes it just as hard for the class internals to change the state as a user of the class. So it would make it annoying to cache a property or to edit the dependencies (e.g., taking into account new_dependencies) after the fact. I believe the way to do it would be something like object.__setattr__(self, attr, value). Doable if it's an occasional optimization, obnoxious if it's all the time. One hopes that if the design is right, it's actually not too expensive to re-compute things (I view "laziness in rendering" as an essential feature of declarative designs).

As for multiple inheritance, I believe that works without any caveats.

I haven't 100% convinced myself that this is a great idea, but sort of came around to it after failing to come up with a protocol that I liked.

@ian-r-rose
Copy link
Copy Markdown
Collaborator

Another downside of dataclasses is that they have the reputation of being slow. @jcrist might yell at me for suggesting them :)

@rjzamora
Copy link
Copy Markdown
Member Author

Thanks again for the feedback here @ian-r-rose

It does seem like the dataclass approach could be promising, but I definitely don't have the Python knowledge to make a decisive call in that direction. Perhaps it would be useful to start by establishing what a "good" dataclass-free approach might look like (so that we can make a better comparison). Clearly the current state of this PR does not ensure true "immutably".

For example, perhaps it makes sense to simply override __setattr__, and allow the developer to define a clear list of immutable and mutable attributes:

class CollectionOperation:

    __frozen_slots__: Tuple[str] = ("dependencies",)
    __mutable_slots__: Tuple[str] = tuple()
        
    def __setattr__(self, name, value):
        # We override __setattr__ to ensure a Collection
        # object is "immutable" in the sense that the
        # attributes in `__frozen_slots__` may only be initialized
        if name in self.__frozen_slots__:
            if hasattr(self, name):
                # Immutable-attribute error
                msg = f"Attribute '{name}' of '{self.__class__}' is immutable."
                raise AttributeError(msg)
            else:
                # Allow initialization
                object.__setattr__(self, name, value)
        elif name in self.__mutable_slots__:
            # Allow arbitrary mutation
            object.__setattr__(self, name, value)
        else:
            # Missing-attribute error
            msg = f"'{self.__class__}' has no '{name}' attribute."
            raise AttributeError(msg)

I do get the sense that we would ultimately avoid adding attributes to __mutable_slots__ at all costs, but at least something like this establishes a clear boundary between immutable/mutable attributes. Are there other more-conventional ways of writing immutable classes in Python?

@ian-r-rose
Copy link
Copy Markdown
Collaborator

ian-r-rose commented May 24, 2022

For example, perhaps it makes sense to simply override __setattr__, and allow the developer to define a clear list of immutable and mutable attributes:

Overriding __setattr__ is exactly what dataclasses do when frozen=True. I don't necessarily want to belabor using them, since the implementation isn't very complex (the whole file is ~1K LoC), and we could get almost all of their benefits without using them. But reinventing features like "frozen-ness" does make me more inclined to look that direction.

Edit: What sorts of things might you consider putting in the mutable_slots for your above example?

Comment on lines +102 to +103
if new_dependencies:
raise ValueError("CompatFrameOperation cannot have dependencies")
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this a TODO? Rewriting dependencies in an operation stack with a CompatFrameOperation in the middle sounds like an important part of this to me.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm still going back and forth on this. I got the impression that the "easiest" approach is to go "all or nothing" on CompatFrameOperation in the sense that an unsupported API call would simply convert the entire graph to a single CompatFrameOperation. There is obviously good motivation not to do this if we don't need to. However, the subgraph method requires the operation to materialize its own graph and specify the specific keys required by its dependencies. I had trouble thinking of a simple way to ""automatically" support this.

@ian-r-rose
Copy link
Copy Markdown
Collaborator

I should also add @rjzamora that since this is against a long-lived feature branch, I don't want to stand in the way of merging proposals to the design early.

@rjzamora
Copy link
Copy Markdown
Member Author

What sorts of things might you consider putting in the mutable_slots for your above example?

I imagine this being for caching (a fallback mechanism for avoiding repeated work if there is no other way around it).

@rjzamora
Copy link
Copy Markdown
Member Author

I should also add @rjzamora that since this is against a long-lived feature branch, I don't want to stand in the way of merging proposals to the design early.

I don't think we should let any PR sit for too long in this feature branch, but I do think this initial design is important enough to discuss it for a few days (if needed). We can always make changes to the base CollectionOperation design later on, but it is certainly easiest to change things now :)

@ian-r-rose
Copy link
Copy Markdown
Collaborator

Thanks for experimenting with the @dataclass idea. I did the same over in ian-r-rose@8d52345 . In so doing, I came across a couple of additional drawbacks worth noting (I suspect you hit them as well):

  1. It's harder to normalize inputs to a common type, c.f. accepting dict | HighLevelGraph -> HighLevelGraph
  2. Because of the way the __init__ is generated, it's hard to give default values to attributes in base classes (see discussion here). This is fixed in python 3.10, but still a problem in prior versions. So it would probably require instantiators of the class to provide extra boilerplate, or to create some additional factory methods.

I'm not certain whether these are fatal to the idea, how are you finding the experience?

Comment on lines +29 to +31
def replace_divisions(self, value) -> CollectionOperation[tuple[str, int]]:
"""Return a new operation with different divisions"""
raise ValueError(f"divisions cannot be modified for {type(self)}")
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why define this at all instead of going through reinitialize?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was thinking that we may not want to require divisions and meta to be data fields on all operation objects.


from dask.base import tokenize

KeyType = TypeVar("KeyType", bound=Hashable)
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This KeyType stuff is orthogonal to the actual class representation, bit I liked it.

@github-actions github-actions bot added the io label May 26, 2022
@rjzamora
Copy link
Copy Markdown
Member Author

I implemented some more-complex fusion logic, and it seems like dataclasses provide what we need as long as optional arguments are avoided on anything other than "leaf" classes.

Note that I was a bit overwhelmed by the type-annotation warnings when I tried to use TypeVar etc. So, I think I'll probably leave that for later (or for someone else to tackle) :)

Comment on lines +77 to +89
@property
def meta(self) -> Any:
return self._meta

def replace_meta(self, value) -> CompatFrameOperation:
return replace(self, _meta=value)

@property
def divisions(self) -> tuple | None:
return tuple(self._divisions) if self._divisions is not None else None

def replace_divisions(self, value) -> CompatFrameOperation:
return replace(self, _divisions=value)
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not convinced that this is any easier to use than just making meta and divisions proper fields on the FrameOperation class:

@dataclass(frozen=True)
class FrameOperation(CollectionOperation):
    """Abtract DataFrame-based CollectionOperation"""
    meta: Any
    divisions: tuple[Any]

op = FrameOperation(meta=df, divisions=(None,)*npartitions)
op.reinitialize(meta=new_meta)

It forces the positioning of the args in the __init__, but perhaps it's not too bad for these cases?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My interest in avoiding "field" status for meta and divisions is entirely because I imagine that we will eventually want to allow the Operation itself to figure out what the meta and divisions should be. If we add meta and divisions as fields, we are establishing that we typicall expect meta and divisions to be known before a FieldOperation is initialized. With that said, this assumptions will certainly be true at first, and I could be wrong about the benefits of breaking away from this behavior.

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That makes sense to me. I still like the idea of having a single entrypoint for "replaying", but there's time and room to experiment here.

@rjzamora rjzamora merged commit 68060af into dask:collection-refactor May 28, 2022
@ian-r-rose
Copy link
Copy Markdown
Collaborator

Off to the races!

@rjzamora
Copy link
Copy Markdown
Member Author

rjzamora commented Jun 2, 2022

@ian-r-rose - Just a note that my next PR will likely revise a lot of what was merged here :)

While thinking through the migration process, I realized that it would be a complete headache to add if statemets all over the existing DataFrame/Series/Array APIs to redirect the code to operation-based logic. Therefore, I am now planning to drop the Compat*Operation approach, and simply implement operation-based subclasses. These subclasses will ultimately need to contain a lot of duplicated code, but it should make migration more manageable (although it is still daunting).

@rjzamora rjzamora deleted the initial-collection-operation branch June 8, 2022 18:26
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants