Skip to content

Conversation

@fjetter
Copy link
Member

@fjetter fjetter commented Jul 24, 2024

This is an early version that will close #9969

It introduces a new Task class (name is subject to change) and a couple of other related subclasses that should replace the tuple as a representation of runnable tasks.

The benefits of this are outlined in #9969 but are primarily focused to reduce overhead during serialization and parsing of results. An important result is also that we can trivially cache functions (and arguments if we wish) to avoid problems like dask/distributed#8767 where users are erroneously providing expensive to pickle functions (which also happens frequently in our own code and/or downstream projects like xarray)

This approach allows us to convert the legacy dsk graph to the new representation with full backwards compatibility. Old graphs can be migrated and new ones written directly using this new representation which will ultimately reduce overhead.

I will follow up with measurements shortly.

Sibling PR in distributed dask/distributed#8797

@github-actions
Copy link
Contributor

github-actions bot commented Jul 24, 2024

Unit Test Results

See test report for an extended history of previous test failures. This is useful for diagnosing flaky tests.

     13 files  ±  0       13 suites  ±0   3h 9m 31s ⏱️ + 7m 47s
 13 222 tests + 39   12 166 ✅ + 39   1 056 💤 ± 0  0 ❌ ±0 
137 910 runs  +507  118 882 ✅ +497  19 028 💤 +10  0 ❌ ±0 

Results for commit 8aed17b. ± Comparison against base commit a6d0bdc.

♻️ This comment has been updated with latest results.

@fjetter

This comment was marked as outdated.

_T = TypeVar("_T", bound="BaseTask")


class WrappedKey:
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This had to move from distributed since we'd get otherwise horrible circular imports

return typ(*args, **kwargs)


def convert_old_style_task(k, arg, all_keys, only_refs) -> BaseTask:
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is very similar to unpack_remotedata

return new_dsk


class KeyRef:
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TODO: Maybe this should inherit from WrappedKey or be replaced by it entirely.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For me, calling this class TaskRef would make more sense since it does reference a Task via its key.

Suggested change
class KeyRef:
class TaskRef:

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm actually leaning towards nuking this class entirely in favor of WrappedKey (keeping that for backwards compat) but I'm happy to have TaskRef as an alias

return True


_auto = object()
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

todo: unused

Copy link
Collaborator

@phofl phofl left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

gave this a first look and looks good!

"literal-string" ~ Literal("key", "literal-string")
Keys, Aliases and KeyRefs
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How would I rename a result that was previously stored under a different key?

{"key": KeyRef("old-key")}?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, this

@hendrikmakait hendrikmakait self-requested a review July 30, 2024 14:52
@fjetter
Copy link
Member Author

fjetter commented Jul 31, 2024

In case anybody reviews. Just had a conversation with Hendrik and here are a couple of expected concerns that were raised and discussed

  • Sequence and Dict of task are relatively useless and we may be better off killing them entirely in favor of an ordinary task that encodes the list/dict generation. During development this was a (kind of) helpful construct but I wouldn't expect this to be used by any user. User API is pretty much only Task and KeyRef
  • LiteralTask is by itself not very useful but this maintains an abstraction where all values of a low level graph obey the same callable structure which makes it quite natural to recurse into it. Again, this wouldn't be used by end users (and likely not even devs outside of the parsing code) (The literal is rendering the normalization I'm talking about i order: remove data task graph normalization #11263 irrelevant)

return super().__sizeof__() + sizeof(self.tasks)


class SequenceOfTasks(BaseTask):
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note that I opted to keep this after all. It is not something end users (or even developers) really have to touch but it makes internals of the Task class much smoother. I'm not married to it, though, and could see it being removed down the line again. It would just require a lot more iterating over things, checking for types, etc.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Personally, I am fine adding some "boilerplate" structures if this makes things easier to understand. I like that these classes are different, this makes the things you have to consider in a single case easier

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not necessarily against these classes, I mostly dislike their name which suggests less magic than they actually perform. This might be rather something like a Sequence{Consolidation|Conflation|Combination|Comprehension|Reduction}Task

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't feel like they are doing much magic and I'm not sure what kind of surprises are hidden here.
I don't find the names you suggested to help with any of this ambiguity.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm happy not to do anthing here, this can always be adjusted later should it remain a source of confusion.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is an open question about public API surface.

I'm inclined to make this module private for now but export something like Task or WrappedKey top level but I don't have a strong opinion here.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1

Copy link
Member Author

@fjetter fjetter left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right now, the Task signature is key: Key, func: Callable, args: tuple, kwargs: dict but it is also possible to make it key: Key, func: Callable, /, *args: Any, **kwargs: Any which would make it more natural to write a task, e.g. Task(key, func, arg1, arg2, kwarg1='foo').

I chose to not go down this path because it makes internals a little more complex (namely we'd have to either make SequenceOfTasks iterable or remove it entirely). The additional complexity is OK if we want this API change

Copy link
Collaborator

@phofl phofl left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This change is only hooked into distributed? I.e. if we get an old style grah then the sync scheduler will still execute the old style graph?

Wouldn't we want to do https://github.com/dask/distributed/pull/8797/files#r1731514684 client side to reduce the upload size?

len(vals) == 1
and k not in (keys or ())
and k in dsk
and not isinstance(dsk[k], BaseTask)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like that this is explicit now

return Task(k, identity, (func, *new_args))
return Task(key=k, func=func, args=tuple(new_args))
try:
if isinstance(arg, (bytes, int, float, str, tuple)) and arg in all_keys:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

a key can be bytes??? or am I missing something here?

Copy link
Member Author

@fjetter fjetter Aug 27, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Theoretically, yes. This is a remnant of python2 compat where str was of type bytes and not unicode.

We should change this but I never put in the time to clean everything up. FWIW these two things needed to change

dask/dask/core.py

Lines 202 to 217 in 2fbe18b

def iskey(key: object) -> bool:
"""Return True if the given object is a potential dask key; False otherwise.
The definition of a key in a Dask graph is any str, bytes, int, float, or tuple
thereof.
See Also
--------
ishashable
validate_key
dask.typing.Key
"""
typ = type(key)
if typ is tuple:
return all(iskey(i) for i in cast(tuple, key))
return typ in {bytes, int, float, str}

Key: TypeAlias = Union[str, bytes, int, float, tuple["Key", ...]]

(and whatever pops up with mypy then).

I haven't used iskey here for perf reasons. The function dispatch is actually noticeable here

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks!

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1

return super().__sizeof__() + sizeof(self.tasks)


class SequenceOfTasks(BaseTask):
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Personally, I am fine adding some "boilerplate" structures if this makes things easier to understand. I like that these classes are different, this makes the things you have to consider in a single case easier

isinstance(t, Alias)
and t.key not in keys
and t.key != k
and t.key in dsk
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

t.key not being in dsk means that we access a persisted result for example?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you add a brief comment for this as well? Took me a while to come to that conclusion

and t.key not in keys
and t.key != k
and t.key in dsk
and len(dependents[t.key]) == 1
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is protection if t.key is actually an input for a non-trivial task?

We would also catch a key that is an alias for 2 different results?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This behavior is tested in test_resolve_aliases

dsk = {
        "bar": "foo",
        "foo": (func, "a", "b"),
        "baz": "bar",
        "foo2": (func, "bar", "c"),
    }

will convert to

{
    'bar': Task('foo'), 
    'baz': Alias(bar), 
    'foo2': Task('foo2')
}

i.e. bar is pointing to foo and foo is only used by bar. Therefore, bar will just inherit the task (Now that I put this here, I notice that we should likely change the key of that Task object as well)

However, bar is being used in baz and foo2 so inlining it would require us to compute it twice.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you add a short comment, it took me a while to remember what exactly this was supposed to catch

@fjetter
Copy link
Member Author

fjetter commented Aug 27, 2024

Wouldn't we want to do https://github.com/dask/distributed/pull/8797/files#r1731514684 client side to reduce the upload size?

That would force materialization client side as well... which may be ok... I have to think about this a little more.

It may be the case that we're currently always materializing things. Arrays are materialized because low level fusion is still enabled and dataframes are materialized because they are using dask-expr. I guess the only case where this is not true are some trivial things like Client.map.

@phofl
Copy link
Collaborator

phofl commented Aug 27, 2024

That would force materialization client side as well... which may be ok... I have to think about this a little more.

Yeah I think maybe we only do this if the graph is materialised already? Not sure if this would add a lot of complexity though

]
def inlinable(key, task):
if (
not isinstance(task, BaseTask)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are Tasks never inlineable? Is this because we inline elsewhere?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is because legacy and new don't play well with each other.

The convert_legacy_* can handle a mix of tasks but other layers cannot (so far, only distributed-only graphs come with Task objects to the conversion in distributed alone is sufficient. The very next step here should be to do a similar change in dask/dask to allow internals to assume we're working in the new system.

TLDR for now, legacy low level optimization is/should be disabled whenever a BaseTask is encountered.

return new_dsk


class KeyRef:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For me, calling this class TaskRef would make more sense since it does reference a Task via its key.

Suggested change
class KeyRef:
class TaskRef:

if (
isinstance(t, Alias)
and t.key not in keys
and t.key != k
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What would actually happen in this self-referential case?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

at the very least dask.order raises a "cycles found" exception because strictly speaking a graph with this kind of self reference is not a DAG.

We had similar logic in the distributed scheduler before so without this kind of filtering, nothing will work. It's just a question about where we do this

return super().__sizeof__() + sizeof(self.tasks)


class SequenceOfTasks(BaseTask):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not necessarily against these classes, I mostly dislike their name which suggests less magic than they actually perform. This might be rather something like a Sequence{Consolidation|Conflation|Combination|Comprehension|Reduction}Task

@fjetter
Copy link
Member Author

fjetter commented Aug 27, 2024

I added a commit that changed the signature to Task(key, func, /, *args, **kwargs) which makes internals sometimes a little more awkward but is much more intuitive to use, I think

__weakref__: Any = None
__slots__ = tuple(__annotations__)

def __init__(self, key: KeyRef | KeyType):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The Alias class feels slightly off. It's the only BaseTask class that does not return the key with which it's also registered - or at least, should be registered with - into dsk. Instead, it returns the aliased key. I think that the intention is to make it simple to skip the Alias, but we have to implement code for optimizing Aliases away nonetheless since users could also just explicitly use the key from dsk. This might be a pain we're facing because the existing API forces us into allowing Aliases. A more explicit variant would be something like Alias(old, new)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

An earlier version distinguished those and I had very often Alias(key, key) objects which felt a little redundant but I think I can make it work. It also struck me as weird to have the key attribute allowed to be different.

Alias(key, ref=None) seems appropriate

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I refactored this in 60df5d9

I used target for the variable name instead of ref or new (since new is not always true, sometimes this is a genuine reference and ref is already taken)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This also makes the resolve_aliases (function name is not ideal) code a little easier to read imo

Copy link
Member

@hendrikmakait hendrikmakait Aug 28, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds good, target definitely works. I wanted to avoid something ambiguous like Alias(key, alias) where it's unclear what the original name is.

Comment on lines 388 to 369
_func_cache: MutableMapping = {}
_func_cache_reverse: MutableMapping = {}
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note this cache is unbounded. I didn't want to use an LRU (also because the acutally usable class for this is in distributed and the functools cache, etc. is not good). Also it's unclear how large the LRU should be.

Ideally, this was a cache that would work on bytes size so we could use zict, etc. for this but I didn't want to deal with this complexity just yet.

# 1. The target key is not in the keys set. The keys set is what the
# user is requesting and by collapsing we'd no longer be able to
# return that result.
# 2. The target key is in fact part of dsk. If it isnt' this could
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
# 2. The target key is in fact part of dsk. If it isnt' this could
# 2. The target key is in fact part of dsk. If it isn't this could

@fjetter
Copy link
Member Author

fjetter commented Aug 29, 2024

FYI Progress is a little hampered. I'm running into the wildest, pretty much unrelated, problems. Just to name a couple

  • There was a thing in our tokenization code
  • Something in dask-expr is generating a corrupt code
  • Some scheduler state machine corruption
  • ...

I'm doing my best to isolate those but they are often a little hard to reproduce and I have to cross check whether it is a problem on main as well.

dask/base.py Outdated
Comment on lines 1098 to 1100
else:
seen = seen.copy()
tok = _seen.set(seen)
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So, I'm running into an issue on dask/distributed in the test test_threadsafe_get. The tests is launching 30 threads that are concurrently hammering the scheduler with the same computation request over and over again, i.e. the tokens should always be identical and the result should in most likeliness even be reused between all the threads.
However, I'm running into problems and this appears to fix it. Haven't managed to isolate it (already worked with RLocks in various places which didn't help). The problem seems to be a counting issue in the seen dictionary but I couldn't reduce it or write a test for it.

I'm slightly concerned about this fix since we call this function quite a lot and copying a non-empty set can be expensive (haven't done any measurements, yet)

Copy link
Member Author

@fjetter fjetter Aug 30, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I removed this again. A workaround for this is to use tokenize directly instead of normalize_token for the GraphNode classes. This way, the likelihood of having those nested self referencing constructs is much smaller / doesn't show up. It's also better from a performance perspective since normalize_token can generate ridiculously deeply nested structures.

@fjetter fjetter force-pushed the task_spec_class branch 4 times, most recently from 527e583 to 2d52549 Compare September 5, 2024 10:26
Copy link
Member

@hendrikmakait hendrikmakait left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, @fjetter! This looks overall good to me. I have a few non-blocking comments regarding consistent wording and adding issues for remaining TODOs in the code.

return self.value

def __repr__(self):
return f"Literal({self.key}, type={self.typ}, {self.value})"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
return f"Literal({self.key}, type={self.typ}, {self.value})"
return f"DataNode({self.key}, type={self.typ}, {self.value})"

""" Task specification for dask
This module contains the task specification for dask. It is used to represent
runnable and non-runnable (i.e. literals) tasks in a dask graph.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
runnable and non-runnable (i.e. literals) tasks in a dask graph.
runnable (task) and non-runnable (data) nodes in a dask graph.

{"a": func("b")} ~ DictOfTasks("key", {"a": Task("a", func, "b")})
"literal-string" ~ Literal("key", "literal-string")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
"literal-string" ~ Literal("key", "literal-string")
"literal-string" ~ DataNode("key", "literal-string")

def inline(self, dsk) -> GraphNode:
raise NotImplementedError("Not implemented")

def propagate_literal(self) -> GraphNode:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we stay consistent and rename this to something like propagate_data? There are a few other places where the word literal is being used, we may want to check for consistency there as well.

(no_new_edges or height < max_depth_new_edges)
and (
not isinstance(dsk[parent], GraphNode)
# TODO: substitute can be implemented with GraphNode.inline
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we add an issue for this?

return super().__sizeof__() + sizeof(self.tasks)


class SequenceOfTasks(BaseTask):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm happy not to do anthing here, this can always be adjusted later should it remain a source of confusion.

@fjetter
Copy link
Member Author

fjetter commented Sep 5, 2024

FYI I'm currently working on removing the Sequence/Dict classes. I suspect there could be a perf gain. I'll report back once it is stable. Either way, we can move forward with this. Without any further changes in distributed or in this repo, this cahnge alone is pretty harmless

@fjetter
Copy link
Member Author

fjetter commented Sep 6, 2024

I have a follow up ready that addresses all of the above review comments and removes the sequence/dict tasks. In favor of having a simple review, I'll move forward with merging this PR (will not affect anything, yet)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Abandon encoded tuples as task definition in dsk graphs

3 participants