Skip to content

Non-intrusive task annotations#6059

Closed
madsbk wants to merge 6 commits intodask:masterfrom
madsbk:task_annotation
Closed

Non-intrusive task annotations#6059
madsbk wants to merge 6 commits intodask:masterfrom
madsbk:task_annotation

Conversation

@madsbk
Copy link
Contributor

@madsbk madsbk commented Apr 2, 2020

This PR introduces non-intrusive task annotation that makes it possible to annotate tasks with arbitrary information. In order to make this non-intrusive, this implementation annotate the task's callable using functools.partial thus any component in Dask/Distributed can be completely agnostic of the annotation without any performance overhead.

Overhead is introduced only for components that use the annotation. For instance, this PR makes order.order() traverse the graph keys once in order to check annotations, which is negligible compared to the rest of the computation in order.order().

Inspired by dask/distributed#2180, but this PR is less intrusive because it doesn't change a task's arguments.
Fixes #6054
Closes #6051

  • Docs added
  • Tests added / passed
  • Passes black dask / flake8 dask

cc. @rjzamora, @mrocklin, @eriknw, @dhirschfeld

@dhirschfeld
Copy link

ping @sjperkins - having also worked on this feature you might have some thoughts on the implementation here.

@sjperkins
Copy link
Member

Thanks for pinging me @dhirschfeld.

@madsbk, I think this is a great improvement on dask/distributed#2180. Placing the annotation on a functools wrapper (rather than at the end of the task tuple in dask/distributed#2180) makes it far easier to set and get the annotation.

A possible (minor, compared to its advantages) downside to the approach might be an increase in the size of the serialized graph due to the functools wrapper. This should be simple to test. However, I've always considered an annotation on every graph task to be a worst-case scenario. I think it's far more likely that only a few tasks will be annotated.

I think the challenge going forward will be handling the interaction between annotations and the optimization code, especially with regard to task fusion.

Initially, I thought some sort of logic hook would be needed to decide how to fuse tasks with differing annotations, but thinking about it a bit more I think that tasks with differing annotations should simply not be fused at all. Firstly, because catering to nested tasks will make the threaded and distributed schedulers more complex and secondly, because I don't think annotations generally combine well. For example:

  • tasks with differing priorities shouldn't be prioritised the same
  • tasks assigned to different workers shouldn't be assigned to the same worker.
  • tasks with different resource requirements shouldn't require one of them.

Another area to think about would be the Blockwise object and the related optimize_blockwise method. Once again, if the encapsulated function has differing annotations I think the layers should simply not be fused.

Tasks with the same annotation should be candidates for fusion.

Comment on lines +519 to +521
# We know that it is _always_ beneficial to prioritize the
# getitem() task because it makes it possible for Dask to free
# the output of shuffle_group() as fast as possible.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This statement sounds true of most getitems. Most of the time the output will be smaller than the input. What makes it especially important for the shuffle-split workload?

Copy link
Contributor Author

@madsbk madsbk Apr 3, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agree, if we are getting all of the parent task so that it can be freed, prioritizing getitem() makes most often sense.
But in the general case getitem() could imply any kind of memory/compute usage e.g getting an item from a zict dictionary can be very expensive.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right, in this case I think that the condition for this being useful is that the memory use of all of the dependents is less than or equal to the size of the dependency (equal in this case).

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we're submitting this graph along with other graphs at the same time (maybe the shuffle is part of a larger computation) will this also prioritize the shuffle code above those other parts of the graph, or will things still operate normally?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we're submitting this graph along with other graphs at the same time (maybe the shuffle is part of a larger computation) will this also prioritize the shuffle code above those other parts of the graph, or will things still operate normally?

This PR effectively makes the scheduler schedule the getitem() tasks and their parent task shuffle_group() as a single node. The priority of the shuffle code as a whole is not changed thus things should still operate normally,

Comment on lines +355 to +356
for k, v in dsk.items():
if istask(v):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

FYI, the order benchmarks from dask/dask-benchmarks don't show any slowdowns here. I thought that these two lines might be expensive, but if they are we don't catch it in those benchmarks.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am both pleased and surprised by this

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In other words, "I don't trust the benchmarks" :)

I'll see if I can make a benchmark that stresses this, because it really seems like it should have some cost.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, I wasn't trying to cast doubt on the benchmarks. I'm genuinely pleased that this didn't have much of an effect. I haven't really taken much of a look at the benchmarks to know what's going on in them to be able to say either way. I'm mostly trusting that you and Erik have this under control.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

FWIW I tried a bit but couldn't make this block take more than ~3% or so of the run time. What's the worst-case scenario here? My thought was something like

diff --git a/dask/benchmarks/order.py b/dask/benchmarks/order.py
index 2268fe3..620d3f7 100644
--- a/dask/benchmarks/order.py
+++ b/dask/benchmarks/order.py
@@ -1,5 +1,6 @@
 from dask import array as da
 from dask.base import collections_to_dsk
+from dask.core import get_dependencies
 from dask.order import order
 from .common import DaskSuite
 
@@ -162,3 +163,19 @@ class OrderManySubgraphs(DaskSuite):
 
     def time_order_many_subgraphs(self, param):
         order(self.dsk)
+
+
+class TimeOrderMisc(DaskSuite):
+    def setup(self):
+        dsk = {'0': (0,)}
+        for i in range(1, 1_000_000):
+            dsk[str(i)] = (f, str(i - 1))
+
+        dependencies = {k: get_dependencies(dsk, k) for k in dsk}
+
+        self.dsk = dsk
+        self.dependencies = dependencies
+
+    def time_order_silly(self):
+        order(self.dsk, dependencies=self.dependencies)
  1. A large task graph (since this is making another pass over dsk.items()
  2. A very simple task graph (less time in the rest of the order code)

even with that, I get a slight slowdown (7.58s -> 8.2s)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It wouldn't be the first time a benchmark proved our intuition wrong 😉

Copy link
Member

@mrocklin mrocklin left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've attached some comments and questions below.

Additionally, this is a big change for the project, and probably one that requires a decent amount of community involvement. It would be good to engage #3783 and figure out what the right long term approach should be. There are people there who, I think, care somewhat deeply about this topic and we should get their approval.

I think that this approach has a lot going for it in terms of being a lightweight modification of the current system, but a change like this is low level enough that I'd also like to make it only once,

# Check task annotations
for k, v in dsk.items():
if istask(v):
f = v[0] # We prioritize based on the first task function
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What if the task is fused? It seems like this approach, while simple, may silently ignore user annotations if they have gone through task fusion (which is common).

Copy link
Contributor Author

@madsbk madsbk Apr 4, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

True, in this case we only check the first function. We could use functions_of() to check all nested functions with the extra overhead that entails.
Alternately as @sjperkins suggest, the fusers could also check the annotations and act accordingly.

Comment on lines +519 to +521
# We know that it is _always_ beneficial to prioritize the
# getitem() task because it makes it possible for Dask to free
# the output of shuffle_group() as fast as possible.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right, in this case I think that the condition for this being useful is that the memory use of all of the dependents is less than or equal to the size of the dependency (equal in this case).

Comment on lines +519 to +521
# We know that it is _always_ beneficial to prioritize the
# getitem() task because it makes it possible for Dask to free
# the output of shuffle_group() as fast as possible.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we're submitting this graph along with other graphs at the same time (maybe the shuffle is part of a larger computation) will this also prioritize the shuffle code above those other parts of the graph, or will things still operate normally?

Comment on lines +355 to +356
for k, v in dsk.items():
if istask(v):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am both pleased and surprised by this

@dhirschfeld
Copy link

dhirschfeld commented Apr 5, 2020

My interest in annotations is to (hopefully) support labelling tasks as being part of a particular job.

I have a dask cluster which can be running multiple jobs simultaneously, each of which may have hundreds of tasks associated with it. My hope is that by taking care to annotate the tasks as being part of a particular job when they're created that I can build a UI showing how many jobs are currently running and how many tasks each have remaining - e.g. something like:

JobA - 73/100
JobB - 97/123
JobC - 53/53

...instead of just 223/276 with no idea which jobs are running or what state they're in.

If you're being fancy you could also enable drilling down to see the status of the tasks associated with a particular job, perhaps by enabling filtering the task view by job/annotation.

@madsbk
Copy link
Contributor Author

madsbk commented Apr 14, 2020

I think that this approach has a lot going for it in terms of being a lightweight modification of the current system, but a change like this is low level enough that I'd also like to make it only once,

I think we need to discuss this feature in two parts:

  1. How to implement the task annotation. Do we want a lightweight approach like this PR or a more intrusive approach that introduce Key and Task classes instead of generic tuples: Key and Task classes #2299
  2. What should be the semantic of the annotated information. Do we what a loose approach where components can communicate through annotations using there own protocol or do we want a more strict approach where all tasks are annotated with specific information.

Currently, this PR is an example of a loose approach where rearrange_by_column_tasks() communicate priority information to order.order() using its own semantic e.g. in this PR we only check the priority of first function in a task.

The loose approach makes easy to introduce new annotations like the labeling feature @dhirschfeld and it is easy to manage the overhead.

On the other hand, the strict approach makes it easy to reason about what information is in the annotation and we can decide on the semantic of all annotated information. However, in order to manage the performance overhead we probably have to use Cython, Numba, or some other library to implement the bottlenecks.

@lr4d
Copy link
Contributor

lr4d commented Oct 9, 2020

@madsbk are you still working on this? I think this functionality would enable a lot of further improvement to be build on top of it

@mrocklin
Copy link
Member

mrocklin commented Oct 9, 2020

@lr4d @fjetter if you all are interested in annotations I would love to get your thoughts on #6701

@madsbk madsbk closed this Dec 18, 2020
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

[FEA] Task Graph Annotation

8 participants