[WIP][POC] Rough implementation of HLG/Layer Alternative#9076
[WIP][POC] Rough implementation of HLG/Layer Alternative#9076
Conversation
|
FWIW, here's a drive-by comment on the general expression design. I much prefer the pattern of separating the transformation passes that you might do on the expression from the classes that implement it. Right now you have from functools import singledispatch
class MemoizingVisitor:
def __init__(self, func):
self.func = func
self.cache = {}
def __call__(self, expr):
try:
return self.cache[expr]
except KeyError:
return self.cache.setdefault(expr, func(expr, self))
@singledispatch
def _optimize(expr, visitor):
raise AssertionError(f"Unhandled type {type(expr)}")
@_optimize.register(TypeA)
def _(expr, visitor):
transformed_children = map(visitor, expr.children)
return expr.regenerate(transformed_children)
...
def optimize(exprs):
visitor = MemoizingVisitor(_optimize)
return [visitor(expr) for expr in exprs]You could imagine writing the visitors as fixed points of the recursion and then tying the recursive knot outside, but that makes it slightly difficult to control the visitor order (pre- versus post-, or cutoff at some point). I like this approach though because it gives transparent handling of DAGs (as opposed to trees) without expanding the perceived size, and you can share the cache across transformations if wanted. This relies on your expressions being hashable, so the structural part isn't allowed to be mutable. |
… graph materialization
|
Thanks for the feedback @wence-! I was definitely planning to clean up the regenerate/optimize traversal design. I just experimented with some quick changes based on your suggestion, and I agree that it is better to decouple traversal from the |
dask/dataframe/operation.py
Outdated
| self.cache = {} | ||
|
|
||
| def __call__(self, operation): | ||
| def __call__(self, operation, **kwargs): |
There was a problem hiding this comment.
Careful here, kwargs are ignored in the cache key, so they aren't allowed to affect the result differently for two otherwise equal operations.
If they need to, then taking a single hashable arg that the specific visitor can deconstruct works well IME:
def __call__(self, op, arg):
key = (op, arg)
...
def _generate_graph(op, visitor, keys):
dsk, dependency_keys = op.subgraph(dict(keys))
for dep, dep_keys in dependency_keys.items():
dsk.update(visitor(dep, keys=tuple(sorted(dep_keys.items()))
return dsk
def generate_graph(op):
return MemoizingVisitor(_generate_graph)(op, keys=tuple(sorted(op.collection_keys)))
There was a problem hiding this comment.
Right - I missed that. Thanks for pointing this out!
ian-r-rose
left a comment
There was a problem hiding this comment.
Thanks for writing this up @rjzamora, it's really helpful to have something concrete to look at. I'm still digesting this, and haven't really looked much at the compatibility layer or visitor pattern, so my comments/questions are mostly around the API for operations.
Things I like:
- Having an operation know about it's own dependencies seems like a huge improvement to me.
- Having an operation know about
divisions,npartitions, etc also seems like a good choice. I wonder if we might also add acollection_typethat could beDataFrame,Series, orScalar. That might be too leaky of an abstraction, but I also see that it's somewhat fiddly to reason about what the "output" type of an operation currently is. - I'm a bit sad to see the
Mappinginterface go, but overall this does look simpler than the status quo. - It's fewer lines of code than I expected!
My main concern is around writing down some sort of contract for how these operations work. I'm curious if you think it would be helpful to (1) enforce some sort of immutability on Operations to force "replays" of things to always go through regenerate(), and (2) write down a semi-strict Protocol for the interface. I always like operating under some constraints like those, and find it really helpful when both implementing and designing interfaces.
| raise NotImplementedError | ||
|
|
||
| @property | ||
| def collection_keys(self) -> list[tuple]: |
There was a problem hiding this comment.
Not sure we want to specify this just yet (I don't think delayed would satisfy it).
| def collection_keys(self) -> list[tuple]: | |
| def collection_keys(self) -> list[Hashable]: |
dask/dataframe/operation.py
Outdated
| @property | ||
| def meta(self): | ||
| return self._meta | ||
|
|
||
| @meta.setter | ||
| def meta(self, value): | ||
| self._meta = value |
There was a problem hiding this comment.
Thanks for linking that. Note that we could allow the meta to be changed on all CollectionOperation objects, but I think we would need to regenerate the operation.
| return len(self.divisions) - 1 | ||
|
|
||
| @property | ||
| def collection_keys(self) -> list[tuple]: |
There was a problem hiding this comment.
| def collection_keys(self) -> list[tuple]: | |
| def collection_keys(self) -> list[tuple[str, int]]: |
dask/operation.py
Outdated
| return self._dask | ||
|
|
||
| def subgraph(self, keys: list[tuple]) -> tuple[dict, dict]: | ||
| """Return the subgraph and key dependencies for this operation""" |
There was a problem hiding this comment.
If I understand correctly, this is on the list of "not-safe-to-call" without incurring extra work (whether we call it materialization or not). I think one of the thinks that hampered HLG work was it not being clear which pieces of the API were safe and which ones were not safe when working with them, so I'd be in favor of adding big blinking lights wherever possible.
| return {} | ||
|
|
||
| def regenerate(self, new_dependencies: dict, **new_kwargs): | ||
| if "filters" in new_kwargs: |
There was a problem hiding this comment.
I don't see filters above?
There was a problem hiding this comment.
Right - This is the messy case for regenerate where we are actually regenerating the original collection via creation_info. This specific approach still needs some cleanup/standardization.
| self._dependencies = { | ||
| arg.name: arg for arg in self.args if isinstance(arg, CollectionOperation) | ||
| } | ||
| divisions = divisions or (None,) * ( |
There was a problem hiding this comment.
The expectation at this point is that self._dependencies is non-empty (which I think is the intent, but some error checking would be good)
| self, | ||
| func, | ||
| meta, | ||
| *args, | ||
| divisions=None, | ||
| label=None, | ||
| token=None, | ||
| columns=None, | ||
| **kwargs, |
There was a problem hiding this comment.
One thing that I've never liked about the current map_partitions / blockwise API is that the function signatures mix namespaces in a way that is confusing (to me, at least). So divisions, label, columns, etc get picked up in the outer function, whereas **kwargs and *args get passed into the function calls in the actual tasks. One consequence of this is that func can never have any keyword-only arguments like label, which is weird and non-obvious!
I suppose I should expect some pushback, but what would you say to instead collecting args and kwargs in a tuple + dict directly rather than unpacking them into the function signature?
Coming up with an easy-to-describe signature here feels particularly important to me here since it's mirrored in the regenerate function.
| new_dependencies[arg.name] if isinstance(arg, CollectionOperation) else arg | ||
| for arg in self.args | ||
| ] | ||
| return type(self)( |
There was a problem hiding this comment.
The type hierarchy here worries me a bit. This seems to be done so that the below selection variants of this class don't need to re-implement regenerate. Which is nice object-oriented stuff! But there's the implicit assumption here that all derived classes also have the same __init__ signature, which seems a bit dicier to me. You satisfy it in these subclasses, but many subclasses do change the __init__ for perfectly valid reasons.
I guess what I'm trying to say is: this doesn't look wrong, but it does look a bit fragile to me.
There was a problem hiding this comment.
I think this needs some careful design thought, but I don't know enough about the different objects that one will need. I think there are two ways this can go:
- No subclasses of the base "operation" class have their own init method: so now it must be that the base class can regenerate.
- Subclasses have their own init, but conforming to a contract that the base class advertises.
For option two, arguments to init would separate into "child" data (that is instances of Operation), and "non-child" (other parameters). So one would either need a contract for how to pass this, or a way of advertising which slots are which.
dask/dataframe/operation.py
Outdated
| @name.setter | ||
| def name(self, value): | ||
| self._name = value |
There was a problem hiding this comment.
The design of this feels to me like operations should be immutable, and creating one with different attributes should go through regenerate(). What do you think about that as a constraint, and changing meta, name, and divisions to being getters only?
My experience is that operating under constraints like that (and type annotations) is sometimes annoying, but pays off in the long run.
There was a problem hiding this comment.
Definite agreement on this point from me.
There was a problem hiding this comment.
Yes - We absolutely need a CollectionOperation object to be immutable, and so we should throw an error if the meta/divisions/etc setter is used. However, we do need to allow these properties to be set for the specific case of CompatFrameOperation (for now).
Does this make sense? I will try to make this contract a bit clearer as I clean things up.
There was a problem hiding this comment.
Can you not just set the private attribute e.g. _name directly without creating a public setter for it?
wence-
left a comment
There was a problem hiding this comment.
A few more minor comments/thoughts on read-through
dask/dataframe/operation.py
Outdated
| @name.setter | ||
| def name(self, value): | ||
| self._name = value |
There was a problem hiding this comment.
Definite agreement on this point from me.
| new_dependencies[arg.name] if isinstance(arg, CollectionOperation) else arg | ||
| for arg in self.args | ||
| ] | ||
| return type(self)( |
There was a problem hiding this comment.
I think this needs some careful design thought, but I don't know enough about the different objects that one will need. I think there are two ways this can go:
- No subclasses of the base "operation" class have their own init method: so now it must be that the base class can regenerate.
- Subclasses have their own init, but conforming to a contract that the base class advertises.
For option two, arguments to init would separate into "child" data (that is instances of Operation), and "non-child" (other parameters). So one would either need a contract for how to pass this, or a way of advertising which slots are which.
dask/dataframe/operation.py
Outdated
| applied_filters: list | None = None | ||
| projected_columns: set | None = None |
There was a problem hiding this comment.
Does it make sense for these properties to be represented by separate Operation objects, rather than being properties of every DataFrameOperation?
e.g.
class ProjectColumns:
def __init__(self, op, *columns):
self.op = op
self._columns = frozenset(columns)
Now you one can also make the DataFrameOperation hashable (right now it isn't since some of its properties are not)
Then, column projection pushes the ProjectColumn operation as close to the leaves as is possible (hopefully one ends up with ProjectColumn(SomeDataFrame, "column_name")), and generation of the lower-level graph from this can fuse these two things.
|
So, looking over this briefly my understanding is that the design here is ...
Earlier you mentioned an idea where we removed the Mapping-style graph and instead treated the colleciton (or in this case operation) as that graph. It seems like this is not implemented here yet, correct? Do you have a plan here? Stepping back a bit, I'm seeing a lot of low-level review on this PR, which is great. However, I'm also curious about first thinking a bit about the plan presented here and making larger changes first if we think that they're going to be useful. |
Yes. This particular PR is experimenting with a few things that are likely to go away or change, but the primary concept that I do think is quite promising is the idea of adding a new
Right now, the state of a collection is completely decoupled from graph materialization (and optimization), and the overall idea here is to improve this pain point. Without doing something like this, I don’t think we have much of a chance of ever supporting general graph optimization.
I think the answer to this is “yes and no,” but I may be misunderstanding. My long-term vision was that the “operation” can replace the need for the HLG/Layer API altogether. However, my ideas for graph materialization do still require an operation to know how to generate the necessary graph to produce a specified set of output keys (which is still “mapping” like - even if the operation object is not a Although my long-term vision was to replace HLGs, I don’t think there is a practical path for completely bailing on HLG/Layers all at once. In fact, one variation on the current POC is to allow a CollectionOperation to (optionally) manage its own HLG subgraph. That is, we could lean on the graph-materialization logic we already have (for now), but use the cleaner CollectionOperation API for "high-level" optimization passes.
This PR is absolutely exploratory, and so I am not proposing a clear API or plan just yet. However, I do think I will be pushing hard for a design where the collection state and graph materialization logic live in the same place (like
The reviews so far have been very helpful to me. However, I agree that general design suggestions are most valuable at this point. |
Let me push on this a little. Why not? If we think that this is a good idea, what would stop us from doing this to all of at least one of the major collections (dataframe, array) before pressing the green button? |
Every operation could produce a mapping for the keys that define that operation. We might not need a HLG as well though? In my mind an objective of this work would be to not have to create yet-another-system. HLGs go away and get replaced by operations. I would be sad if we had dataframe store operations store hlgs store layers store tasks.
👍
Sounds great. I like the idea of going back and forth. I'd just want to make sure that at some point we take the learnings from this PR, take a step back, and propose a plan. |
|
Update: The current plan is to implement something similar to what I have here (and much more) in the |
|
It's great to see a lot of thought being put into fundamental improvements to the data structures! I was also very excited to see the design ideas around high-level expressions and so am wondering how (if at all) that fits into the work here? Even if they're off the table for now, it might be an idea to consider them in the design to make any future retro-fitting of expression graphs as easy / least disruptive as possible. |
Good question @dhirschfeld ! The final goal of the proposed solution here is to capture the same behavior/features as a high-level expression system. However, the primary difference is in the proposed development path for getting to the final design (and the specifics of where the expression tree lives). The general background here is that the current state of HighLevelGraph/Layer does not provide what we need for general “high-level” optimization and/or efficient graph generation. There seem to be at least three different ways that we can attack these challenges:
From my perspective, the original HLE plan stalled, because an incremental development path was just too difficult to articulate and justify. @jcrist described some of these challenges, and pointed out that we might as well improve the existing HLG/Layer infrastructure to fit our needs (solution 2 above). Therefore, when I set out to explore a tractable solution here, my original intention was to revise the existing HLG/Layer design. The reason I eventually moved away from an HLG-based solution was that it was much easier for me to imagine an incremental development plan for something completely separate. I am still very open to other solutions. However, my gut tells me that we need to start by moving collection properties (like |
|
From my perspective these are all the same thing. We're just trying to
find the least painful way to get to the same place :)
…On Tue, May 24, 2022 at 11:48 AM Richard (Rick) Zamora < ***@***.***> wrote:
I was also very excited to see the design ideas around #7933
<#7933> and so am wondering how (if at
all) that fits into the work here?
Good question @dhirschfeld <https://github.com/dhirschfeld> ! The final
goal of the proposed solution here is to capture the same behavior/features
as a high-level expression system. However, the primary difference is in
the proposed development path for getting to the final design (and the
specifics of *where* the expression tree lives).
The general background here is that the current state of
HighLevelGraph/Layer does not provide what we need for general “high-level”
optimization and/or efficient graph generation. There seem to be at least
three different ways that we can attack these challenges:
1. HLE-Focused Solution - Completely redesign the existing collection
APIs to become high-level expressions themselves
2. HLG/Layer-Focused Solution - Push information about the collection
state into HLG/Layers and redefine the Layer contract to provide what we
need for efficient graph generation/handling
3. Something in the middle (i.e. This Proposal) - Incrementally move
the collection APIs toward solution (1) by adding an HLE-like property to
the existing collection APIs
From my perspective, the original HLE plan stalled, because an incremental
development path was just too difficult to articulate and justify. @jcrist
<https://github.com/jcrist> described some of these challenges
<#7933 (comment)>, and
pointed out that we might as well improve the existing HLG/Layer
infrastructure to fit our needs (solution 2 above). Therefore, when I set
out to explore a tractable solution here, my original intention was to
revise the existing HLG/Layer design.
The reason I eventually moved away from an HLG-based solution was that it
was much easier for me to imagine an incremental development plan for
something completely separate. I am still very open to other solutions.
However, my gut tells me that we need to start by moving collection
properties (like meta, divisions, etc) into the object that is
responsible for graph materialization. My understanding is that this
critical step is easiest for solution 3, but I could be wrong.
—
Reply to this email directly, view it on GitHub
<#9076 (comment)>, or
unsubscribe
<https://github.com/notifications/unsubscribe-auth/AACKZTH6MMMHLPMGF6E7HB3VLUB65ANCNFSM5VZIDNOA>
.
You are receiving this because you commented.Message ID:
***@***.***>
|
Yep! The main differences are definitely in the "pathway" to the solution. |
WARNING: DO NOT MERGE
This is a rough/experimental implementation of the plan suggested in here in #8980. The specific design needs a lot of work, but I am hoping this PR will eventually serve as a basic reference implementation for the general "plan" if we eventually decide to pursue it.
General Idea: Rather than depending on the
HighLevelGraph/Layerfor graph optimization and materialization, this PR proposes that we add aLayer-like class, calledCollectionOperation, and use it to manage collection metadata and graph materialization/optimization logic. The key advantage being that we resolve the longstanding challenge of having the HLG know nothing about the collection that created it.What this POC has so far:
So far, this PR only implements
CollectionOperationsupport for DataFrame collections, and it only supportsread_parquet,from_map, and element-wise operations (viaelemwise). The following limitations are also important to consider:CollectionOperationbackend,use_operation_api=Truemust be passed to the IO function.CollectionOperationbackend if is not a place-holderCompatFrameOperation(which simply wraps a traditionalHighLevelGraph. Operations that do not support theCollectionOperationbackend will simply force input collection(s) to generate an HLG (with a singleMaterializedLayer) (when theirdaskattribute is called). In the future, when HLG-Pickle support is finished, we can define a special compatibiliyLayerobject to avoid premature materialization.daskattribute is called on aCollectionOperation-based collection, the raw graph is materialized with culling and fusion baked into thegenerate_graphlogic defined in theCollectionOperation. In the future, it probably makes sense to add an option to avoid fusion, but the long-term plan is to avoid the need for an explicitculloperation.Basic usage example:
Note that, since element-wise operations will be automatically fused at graph materialization time, using
ddf4.visualize()shows something a bit different:Note that we can also call
optimize_operationon theCollectionOperation-based collection to perform column projection:What is still missing:
A lot. This is not even close to ready.
Still need to iterate on this design (probably a lot) to get an idea if it will both simplify development and enable useful graph optimizations in the future. I could certainly use help from people with expertise related to high-level expressions (cc @eriknw)