-
-
Notifications
You must be signed in to change notification settings - Fork 1.8k
Add a Task class to replace tuples for task specification #11248
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
Unit Test ResultsSee test report for an extended history of previous test failures. This is useful for diagnosing flaky tests. 13 files ± 0 13 suites ±0 3h 9m 31s ⏱️ + 7m 47s Results for commit 8aed17b. ± Comparison against base commit a6d0bdc. ♻️ This comment has been updated with latest results. |
This comment was marked as outdated.
This comment was marked as outdated.
dask/task_spec.py
Outdated
| _T = TypeVar("_T", bound="BaseTask") | ||
|
|
||
|
|
||
| class WrappedKey: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This had to move from distributed since we'd get otherwise horrible circular imports
dask/task_spec.py
Outdated
| return typ(*args, **kwargs) | ||
|
|
||
|
|
||
| def convert_old_style_task(k, arg, all_keys, only_refs) -> BaseTask: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is very similar to unpack_remotedata
dask/task_spec.py
Outdated
| return new_dsk | ||
|
|
||
|
|
||
| class KeyRef: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
TODO: Maybe this should inherit from WrappedKey or be replaced by it entirely.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For me, calling this class TaskRef would make more sense since it does reference a Task via its key.
| class KeyRef: | |
| class TaskRef: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm actually leaning towards nuking this class entirely in favor of WrappedKey (keeping that for backwards compat) but I'm happy to have TaskRef as an alias
dask/task_spec.py
Outdated
| return True | ||
|
|
||
|
|
||
| _auto = object() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
todo: unused
phofl
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
gave this a first look and looks good!
dask/task_spec.py
Outdated
| "literal-string" ~ Literal("key", "literal-string") | ||
| Keys, Aliases and KeyRefs |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How would I rename a result that was previously stored under a different key?
{"key": KeyRef("old-key")}?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes, this
|
In case anybody reviews. Just had a conversation with Hendrik and here are a couple of expected concerns that were raised and discussed
|
ac42674 to
25b6c9d
Compare
5bcfacd to
3ed7958
Compare
dask/task_spec.py
Outdated
| return super().__sizeof__() + sizeof(self.tasks) | ||
|
|
||
|
|
||
| class SequenceOfTasks(BaseTask): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Note that I opted to keep this after all. It is not something end users (or even developers) really have to touch but it makes internals of the Task class much smoother. I'm not married to it, though, and could see it being removed down the line again. It would just require a lot more iterating over things, checking for types, etc.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Personally, I am fine adding some "boilerplate" structures if this makes things easier to understand. I like that these classes are different, this makes the things you have to consider in a single case easier
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not necessarily against these classes, I mostly dislike their name which suggests less magic than they actually perform. This might be rather something like a Sequence{Consolidation|Conflation|Combination|Comprehension|Reduction}Task
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't feel like they are doing much magic and I'm not sure what kind of surprises are hidden here.
I don't find the names you suggested to help with any of this ambiguity.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm happy not to do anthing here, this can always be adjusted later should it remain a source of confusion.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There is an open question about public API surface.
I'm inclined to make this module private for now but export something like Task or WrappedKey top level but I don't have a strong opinion here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1
fjetter
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Right now, the Task signature is key: Key, func: Callable, args: tuple, kwargs: dict but it is also possible to make it key: Key, func: Callable, /, *args: Any, **kwargs: Any which would make it more natural to write a task, e.g. Task(key, func, arg1, arg2, kwarg1='foo').
I chose to not go down this path because it makes internals a little more complex (namely we'd have to either make SequenceOfTasks iterable or remove it entirely). The additional complexity is OK if we want this API change
phofl
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This change is only hooked into distributed? I.e. if we get an old style grah then the sync scheduler will still execute the old style graph?
Wouldn't we want to do https://github.com/dask/distributed/pull/8797/files#r1731514684 client side to reduce the upload size?
dask/optimization.py
Outdated
| len(vals) == 1 | ||
| and k not in (keys or ()) | ||
| and k in dsk | ||
| and not isinstance(dsk[k], BaseTask) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I like that this is explicit now
| return Task(k, identity, (func, *new_args)) | ||
| return Task(key=k, func=func, args=tuple(new_args)) | ||
| try: | ||
| if isinstance(arg, (bytes, int, float, str, tuple)) and arg in all_keys: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
a key can be bytes??? or am I missing something here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Theoretically, yes. This is a remnant of python2 compat where str was of type bytes and not unicode.
We should change this but I never put in the time to clean everything up. FWIW these two things needed to change
Lines 202 to 217 in 2fbe18b
| def iskey(key: object) -> bool: | |
| """Return True if the given object is a potential dask key; False otherwise. | |
| The definition of a key in a Dask graph is any str, bytes, int, float, or tuple | |
| thereof. | |
| See Also | |
| -------- | |
| ishashable | |
| validate_key | |
| dask.typing.Key | |
| """ | |
| typ = type(key) | |
| if typ is tuple: | |
| return all(iskey(i) for i in cast(tuple, key)) | |
| return typ in {bytes, int, float, str} |
Line 28 in 2fbe18b
| Key: TypeAlias = Union[str, bytes, int, float, tuple["Key", ...]] |
(and whatever pops up with mypy then).
I haven't used iskey here for perf reasons. The function dispatch is actually noticeable here
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1
dask/task_spec.py
Outdated
| return super().__sizeof__() + sizeof(self.tasks) | ||
|
|
||
|
|
||
| class SequenceOfTasks(BaseTask): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Personally, I am fine adding some "boilerplate" structures if this makes things easier to understand. I like that these classes are different, this makes the things you have to consider in a single case easier
dask/task_spec.py
Outdated
| isinstance(t, Alias) | ||
| and t.key not in keys | ||
| and t.key != k | ||
| and t.key in dsk |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
t.key not being in dsk means that we access a persisted result for example?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you add a brief comment for this as well? Took me a while to come to that conclusion
dask/task_spec.py
Outdated
| and t.key not in keys | ||
| and t.key != k | ||
| and t.key in dsk | ||
| and len(dependents[t.key]) == 1 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is protection if t.key is actually an input for a non-trivial task?
We would also catch a key that is an alias for 2 different results?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This behavior is tested in test_resolve_aliases
dsk = {
"bar": "foo",
"foo": (func, "a", "b"),
"baz": "bar",
"foo2": (func, "bar", "c"),
}will convert to
{
'bar': Task('foo'),
'baz': Alias(bar),
'foo2': Task('foo2')
}i.e. bar is pointing to foo and foo is only used by bar. Therefore, bar will just inherit the task (Now that I put this here, I notice that we should likely change the key of that Task object as well)
However, bar is being used in baz and foo2 so inlining it would require us to compute it twice.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you add a short comment, it took me a while to remember what exactly this was supposed to catch
That would force materialization client side as well... which may be ok... I have to think about this a little more. It may be the case that we're currently always materializing things. Arrays are materialized because low level fusion is still enabled and dataframes are materialized because they are using dask-expr. I guess the only case where this is not true are some trivial things like |
Yeah I think maybe we only do this if the graph is materialised already? Not sure if this would add a lot of complexity though |
dask/optimization.py
Outdated
| ] | ||
| def inlinable(key, task): | ||
| if ( | ||
| not isinstance(task, BaseTask) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Are Tasks never inlineable? Is this because we inline elsewhere?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is because legacy and new don't play well with each other.
The convert_legacy_* can handle a mix of tasks but other layers cannot (so far, only distributed-only graphs come with Task objects to the conversion in distributed alone is sufficient. The very next step here should be to do a similar change in dask/dask to allow internals to assume we're working in the new system.
TLDR for now, legacy low level optimization is/should be disabled whenever a BaseTask is encountered.
dask/task_spec.py
Outdated
| return new_dsk | ||
|
|
||
|
|
||
| class KeyRef: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For me, calling this class TaskRef would make more sense since it does reference a Task via its key.
| class KeyRef: | |
| class TaskRef: |
dask/task_spec.py
Outdated
| if ( | ||
| isinstance(t, Alias) | ||
| and t.key not in keys | ||
| and t.key != k |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What would actually happen in this self-referential case?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
at the very least dask.order raises a "cycles found" exception because strictly speaking a graph with this kind of self reference is not a DAG.
We had similar logic in the distributed scheduler before so without this kind of filtering, nothing will work. It's just a question about where we do this
dask/task_spec.py
Outdated
| return super().__sizeof__() + sizeof(self.tasks) | ||
|
|
||
|
|
||
| class SequenceOfTasks(BaseTask): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not necessarily against these classes, I mostly dislike their name which suggests less magic than they actually perform. This might be rather something like a Sequence{Consolidation|Conflation|Combination|Comprehension|Reduction}Task
|
I added a commit that changed the signature to |
dask/_task_spec.py
Outdated
| __weakref__: Any = None | ||
| __slots__ = tuple(__annotations__) | ||
|
|
||
| def __init__(self, key: KeyRef | KeyType): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The Alias class feels slightly off. It's the only BaseTask class that does not return the key with which it's also registered - or at least, should be registered with - into dsk. Instead, it returns the aliased key. I think that the intention is to make it simple to skip the Alias, but we have to implement code for optimizing Aliases away nonetheless since users could also just explicitly use the key from dsk. This might be a pain we're facing because the existing API forces us into allowing Aliases. A more explicit variant would be something like Alias(old, new)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
An earlier version distinguished those and I had very often Alias(key, key) objects which felt a little redundant but I think I can make it work. It also struck me as weird to have the key attribute allowed to be different.
Alias(key, ref=None) seems appropriate
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I refactored this in 60df5d9
I used target for the variable name instead of ref or new (since new is not always true, sometimes this is a genuine reference and ref is already taken)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This also makes the resolve_aliases (function name is not ideal) code a little easier to read imo
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sounds good, target definitely works. I wanted to avoid something ambiguous like Alias(key, alias) where it's unclear what the original name is.
dask/_task_spec.py
Outdated
| _func_cache: MutableMapping = {} | ||
| _func_cache_reverse: MutableMapping = {} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Note this cache is unbounded. I didn't want to use an LRU (also because the acutally usable class for this is in distributed and the functools cache, etc. is not good). Also it's unclear how large the LRU should be.
Ideally, this was a cache that would work on bytes size so we could use zict, etc. for this but I didn't want to deal with this complexity just yet.
| # 1. The target key is not in the keys set. The keys set is what the | ||
| # user is requesting and by collapsing we'd no longer be able to | ||
| # return that result. | ||
| # 2. The target key is in fact part of dsk. If it isnt' this could |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| # 2. The target key is in fact part of dsk. If it isnt' this could | |
| # 2. The target key is in fact part of dsk. If it isn't this could |
|
FYI Progress is a little hampered. I'm running into the wildest, pretty much unrelated, problems. Just to name a couple
I'm doing my best to isolate those but they are often a little hard to reproduce and I have to cross check whether it is a problem on main as well. |
dask/base.py
Outdated
| else: | ||
| seen = seen.copy() | ||
| tok = _seen.set(seen) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So, I'm running into an issue on dask/distributed in the test test_threadsafe_get. The tests is launching 30 threads that are concurrently hammering the scheduler with the same computation request over and over again, i.e. the tokens should always be identical and the result should in most likeliness even be reused between all the threads.
However, I'm running into problems and this appears to fix it. Haven't managed to isolate it (already worked with RLocks in various places which didn't help). The problem seems to be a counting issue in the seen dictionary but I couldn't reduce it or write a test for it.
I'm slightly concerned about this fix since we call this function quite a lot and copying a non-empty set can be expensive (haven't done any measurements, yet)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I removed this again. A workaround for this is to use tokenize directly instead of normalize_token for the GraphNode classes. This way, the likelihood of having those nested self referencing constructs is much smaller / doesn't show up. It's also better from a performance perspective since normalize_token can generate ridiculously deeply nested structures.
527e583 to
2d52549
Compare
hendrikmakait
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks, @fjetter! This looks overall good to me. I have a few non-blocking comments regarding consistent wording and adding issues for remaining TODOs in the code.
| return self.value | ||
|
|
||
| def __repr__(self): | ||
| return f"Literal({self.key}, type={self.typ}, {self.value})" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| return f"Literal({self.key}, type={self.typ}, {self.value})" | |
| return f"DataNode({self.key}, type={self.typ}, {self.value})" |
| """ Task specification for dask | ||
| This module contains the task specification for dask. It is used to represent | ||
| runnable and non-runnable (i.e. literals) tasks in a dask graph. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| runnable and non-runnable (i.e. literals) tasks in a dask graph. | |
| runnable (task) and non-runnable (data) nodes in a dask graph. |
| {"a": func("b")} ~ DictOfTasks("key", {"a": Task("a", func, "b")}) | ||
| "literal-string" ~ Literal("key", "literal-string") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| "literal-string" ~ Literal("key", "literal-string") | |
| "literal-string" ~ DataNode("key", "literal-string") |
| def inline(self, dsk) -> GraphNode: | ||
| raise NotImplementedError("Not implemented") | ||
|
|
||
| def propagate_literal(self) -> GraphNode: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we stay consistent and rename this to something like propagate_data? There are a few other places where the word literal is being used, we may want to check for consistency there as well.
| (no_new_edges or height < max_depth_new_edges) | ||
| and ( | ||
| not isinstance(dsk[parent], GraphNode) | ||
| # TODO: substitute can be implemented with GraphNode.inline |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we add an issue for this?
dask/task_spec.py
Outdated
| return super().__sizeof__() + sizeof(self.tasks) | ||
|
|
||
|
|
||
| class SequenceOfTasks(BaseTask): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm happy not to do anthing here, this can always be adjusted later should it remain a source of confusion.
|
FYI I'm currently working on removing the Sequence/Dict classes. I suspect there could be a perf gain. I'll report back once it is stable. Either way, we can move forward with this. Without any further changes in distributed or in this repo, this cahnge alone is pretty harmless |
|
I have a follow up ready that addresses all of the above review comments and removes the sequence/dict tasks. In favor of having a simple review, I'll move forward with merging this PR (will not affect anything, yet) |
2d52549 to
8aed17b
Compare
This is an early version that will close #9969
It introduces a new
Taskclass (name is subject to change) and a couple of other related subclasses that should replace the tuple as a representation of runnable tasks.The benefits of this are outlined in #9969 but are primarily focused to reduce overhead during serialization and parsing of results. An important result is also that we can trivially cache functions (and arguments if we wish) to avoid problems like dask/distributed#8767 where users are erroneously providing expensive to pickle functions (which also happens frequently in our own code and/or downstream projects like xarray)
This approach allows us to convert the legacy dsk graph to the new representation with full backwards compatibility. Old graphs can be migrated and new ones written directly using this new representation which will ultimately reduce overhead.
I will follow up with measurements shortly.
Sibling PR in distributed dask/distributed#8797