Conversation
|
ping @sjperkins - having also worked on this feature you might have some thoughts on the implementation here. |
|
Thanks for pinging me @dhirschfeld. @madsbk, I think this is a great improvement on dask/distributed#2180. Placing the annotation on a functools wrapper (rather than at the end of the task tuple in dask/distributed#2180) makes it far easier to set and get the annotation. A possible (minor, compared to its advantages) downside to the approach might be an increase in the size of the serialized graph due to the functools wrapper. This should be simple to test. However, I've always considered an annotation on every graph task to be a worst-case scenario. I think it's far more likely that only a few tasks will be annotated. I think the challenge going forward will be handling the interaction between annotations and the optimization code, especially with regard to task fusion. Initially, I thought some sort of logic hook would be needed to decide how to fuse tasks with differing annotations, but thinking about it a bit more I think that tasks with differing annotations should simply not be fused at all. Firstly, because catering to nested tasks will make the threaded and distributed schedulers more complex and secondly, because I don't think annotations generally combine well. For example:
Another area to think about would be the Blockwise object and the related optimize_blockwise method. Once again, if the encapsulated function has differing annotations I think the layers should simply not be fused. Tasks with the same annotation should be candidates for fusion. |
| # We know that it is _always_ beneficial to prioritize the | ||
| # getitem() task because it makes it possible for Dask to free | ||
| # the output of shuffle_group() as fast as possible. |
There was a problem hiding this comment.
This statement sounds true of most getitems. Most of the time the output will be smaller than the input. What makes it especially important for the shuffle-split workload?
There was a problem hiding this comment.
Agree, if we are getting all of the parent task so that it can be freed, prioritizing getitem() makes most often sense.
But in the general case getitem() could imply any kind of memory/compute usage e.g getting an item from a zict dictionary can be very expensive.
There was a problem hiding this comment.
Right, in this case I think that the condition for this being useful is that the memory use of all of the dependents is less than or equal to the size of the dependency (equal in this case).
There was a problem hiding this comment.
If we're submitting this graph along with other graphs at the same time (maybe the shuffle is part of a larger computation) will this also prioritize the shuffle code above those other parts of the graph, or will things still operate normally?
There was a problem hiding this comment.
If we're submitting this graph along with other graphs at the same time (maybe the shuffle is part of a larger computation) will this also prioritize the shuffle code above those other parts of the graph, or will things still operate normally?
This PR effectively makes the scheduler schedule the getitem() tasks and their parent task shuffle_group() as a single node. The priority of the shuffle code as a whole is not changed thus things should still operate normally,
| for k, v in dsk.items(): | ||
| if istask(v): |
There was a problem hiding this comment.
FYI, the order benchmarks from dask/dask-benchmarks don't show any slowdowns here. I thought that these two lines might be expensive, but if they are we don't catch it in those benchmarks.
There was a problem hiding this comment.
I am both pleased and surprised by this
There was a problem hiding this comment.
In other words, "I don't trust the benchmarks" :)
I'll see if I can make a benchmark that stresses this, because it really seems like it should have some cost.
There was a problem hiding this comment.
Oh, I wasn't trying to cast doubt on the benchmarks. I'm genuinely pleased that this didn't have much of an effect. I haven't really taken much of a look at the benchmarks to know what's going on in them to be able to say either way. I'm mostly trusting that you and Erik have this under control.
There was a problem hiding this comment.
👍
FWIW I tried a bit but couldn't make this block take more than ~3% or so of the run time. What's the worst-case scenario here? My thought was something like
diff --git a/dask/benchmarks/order.py b/dask/benchmarks/order.py
index 2268fe3..620d3f7 100644
--- a/dask/benchmarks/order.py
+++ b/dask/benchmarks/order.py
@@ -1,5 +1,6 @@
from dask import array as da
from dask.base import collections_to_dsk
+from dask.core import get_dependencies
from dask.order import order
from .common import DaskSuite
@@ -162,3 +163,19 @@ class OrderManySubgraphs(DaskSuite):
def time_order_many_subgraphs(self, param):
order(self.dsk)
+
+
+class TimeOrderMisc(DaskSuite):
+ def setup(self):
+ dsk = {'0': (0,)}
+ for i in range(1, 1_000_000):
+ dsk[str(i)] = (f, str(i - 1))
+
+ dependencies = {k: get_dependencies(dsk, k) for k in dsk}
+
+ self.dsk = dsk
+ self.dependencies = dependencies
+
+ def time_order_silly(self):
+ order(self.dsk, dependencies=self.dependencies)
- A large task graph (since this is making another pass over
dsk.items() - A very simple task graph (less time in the rest of the
ordercode)
even with that, I get a slight slowdown (7.58s -> 8.2s)
There was a problem hiding this comment.
It wouldn't be the first time a benchmark proved our intuition wrong 😉
mrocklin
left a comment
There was a problem hiding this comment.
I've attached some comments and questions below.
Additionally, this is a big change for the project, and probably one that requires a decent amount of community involvement. It would be good to engage #3783 and figure out what the right long term approach should be. There are people there who, I think, care somewhat deeply about this topic and we should get their approval.
I think that this approach has a lot going for it in terms of being a lightweight modification of the current system, but a change like this is low level enough that I'd also like to make it only once,
| # Check task annotations | ||
| for k, v in dsk.items(): | ||
| if istask(v): | ||
| f = v[0] # We prioritize based on the first task function |
There was a problem hiding this comment.
What if the task is fused? It seems like this approach, while simple, may silently ignore user annotations if they have gone through task fusion (which is common).
There was a problem hiding this comment.
True, in this case we only check the first function. We could use functions_of() to check all nested functions with the extra overhead that entails.
Alternately as @sjperkins suggest, the fusers could also check the annotations and act accordingly.
| # We know that it is _always_ beneficial to prioritize the | ||
| # getitem() task because it makes it possible for Dask to free | ||
| # the output of shuffle_group() as fast as possible. |
There was a problem hiding this comment.
Right, in this case I think that the condition for this being useful is that the memory use of all of the dependents is less than or equal to the size of the dependency (equal in this case).
| # We know that it is _always_ beneficial to prioritize the | ||
| # getitem() task because it makes it possible for Dask to free | ||
| # the output of shuffle_group() as fast as possible. |
There was a problem hiding this comment.
If we're submitting this graph along with other graphs at the same time (maybe the shuffle is part of a larger computation) will this also prioritize the shuffle code above those other parts of the graph, or will things still operate normally?
| for k, v in dsk.items(): | ||
| if istask(v): |
There was a problem hiding this comment.
I am both pleased and surprised by this
|
My interest in annotations is to (hopefully) support labelling tasks as being part of a particular job. I have a dask cluster which can be running multiple jobs simultaneously, each of which may have hundreds of tasks associated with it. My hope is that by taking care to annotate the tasks as being part of a particular job when they're created that I can build a UI showing how many jobs are currently running and how many tasks each have remaining - e.g. something like: ...instead of just If you're being fancy you could also enable drilling down to see the status of the tasks associated with a particular job, perhaps by enabling filtering the task view by job/annotation. |
I think we need to discuss this feature in two parts:
Currently, this PR is an example of a loose approach where The loose approach makes easy to introduce new annotations like the labeling feature @dhirschfeld and it is easy to manage the overhead. On the other hand, the strict approach makes it easy to reason about what information is in the annotation and we can decide on the semantic of all annotated information. However, in order to manage the performance overhead we probably have to use Cython, Numba, or some other library to implement the bottlenecks. |
|
@madsbk are you still working on this? I think this functionality would enable a lot of further improvement to be build on top of it |
This PR introduces non-intrusive task annotation that makes it possible to annotate tasks with arbitrary information. In order to make this non-intrusive, this implementation annotate the task's callable using
functools.partialthus any component in Dask/Distributed can be completely agnostic of the annotation without any performance overhead.Overhead is introduced only for components that use the annotation. For instance, this PR makes
order.order()traverse the graph keys once in order to check annotations, which is negligible compared to the rest of the computation inorder.order().Inspired by dask/distributed#2180, but this PR is less intrusive because it doesn't change a task's arguments.
Fixes #6054
Closes #6051
black dask/flake8 daskcc. @rjzamora, @mrocklin, @eriknw, @dhirschfeld