Conversation
516ade9 to
1a8e4a0
Compare
jrbourbeau
left a comment
There was a problem hiding this comment.
Thanks for your work here @ian-r-rose. It looks like there are some merge conflicts which are preventing CI from running. When you get a chance, would you mind merging /rebasing master?
d0c3740 to
469450f
Compare
|
We've had some out-of-band discussion about the ability to annotate specific collections with things like c.compute(x, retries={x: 2})This worked for some collections, but not others. In 6e0695b I remove that ability, so that should be a good overview of what some of the consequences are. This is a breaking change, but perhaps of some functionality that was not widely used. These per-collection annotations can be accomplished with the new syntax using something like with dask.annotate(retries=2):
x = da.ones((1,1)
c.compute(x)This largely works, with the caveat that dask/dask#7036 should be fixed to get around some surprising results. |
ab6c85e to
e63cbbe
Compare
mrocklin
left a comment
There was a problem hiding this comment.
Whoops. It looks like I might have had some lingering comments in review. Submitting now, although they may be out of date.
| assert s.host_restrictions[L[2].key] == {b.ip} | ||
|
|
||
| with pytest.raises(ValueError): | ||
| c.map(inc, [10, 11, 12], workers=[{a.ip}]) |
There was a problem hiding this comment.
I'm curious why these were removed.
There was a problem hiding this comment.
I was generally trying to avoid any expansion of keys/collections on the client, instead preferring to only allow a worker-or-iterable-of-workers. With that API restriction, the unpacking/expanding logic on the scheduler is much simpler. So I view this as philosophically aligned with removing the ability to expand { collection: worker } on the client.
ian-r-rose
left a comment
There was a problem hiding this comment.
Thanks for taking a look @mrocklin, I've responded to your comments inline.
I would also flag: there are a lot of tests which now have optimize_graph=False. This is due to
dask/dask#7036, and should give an indication of what kinds of situations that affects
| assert s.host_restrictions[L[2].key] == {b.ip} | ||
|
|
||
| with pytest.raises(ValueError): | ||
| c.map(inc, [10, 11, 12], workers=[{a.ip}]) |
There was a problem hiding this comment.
I was generally trying to avoid any expansion of keys/collections on the client, instead preferring to only allow a worker-or-iterable-of-workers. With that API restriction, the unpacking/expanding logic on the scheduler is much simpler. So I view this as philosophically aligned with removing the ability to expand { collection: worker } on the client.
|
Checking in. How are we doing here? |
|
I think this is ready for review from my perspective. Tests are passing (except some that seem flaky and unrelated?), the major question in my mind is whether the backwards-incompatibilties introduced are worth it, and what to do about dask/dask#7036 . |
|
@sjperkins are you around to review? |
|
Sure will take a look over the next day or two. Thanks for working on this @ian-r-rose |
sjperkins
left a comment
There was a problem hiding this comment.
I've added some comments: I'm most interested in the one concerning annotation expansion.
I would also flag: there are a lot of tests which now have optimize_graph=False. This is due to
dask/dask#7036, and should give an indication of what kinds of situations that affects.
I might suggest commenting each optimize_graph=False case to indicate that it should be removed when dask/dask#7036 is fixed. Its a bit laborious and ugly but I can't think of a better way to handle this at present.
| new_annotations[k] = merge(annotations[k], v) | ||
| else: | ||
| new_annotations[k] = v | ||
| annotations.update(new_annotations) |
There was a problem hiding this comment.
I understand this change provides backwards compatibility for the existing per-key Client.{submit, persist} kwarg taxonomy (retries, priority, workers)?
The functionality in _materialized_layer_unpack mirrors Layer.__dask_distributed_unpack__ and it all it's subclasses. They also need to expand and unpack annotations.
I wonder if it would be better to place this functionality in Layer.expand_annotations so that it automatically is applied through the Layer class hierarchy? It would require a sister dask PR though.
There was a problem hiding this comment.
Thanks for pointing this out. This is actually not for backwards compatibility of per-key/collection annotations, but instead is needed to handle the case where different HLG Layers have different retries/priority/workers.
For example, if we are unpacking a two-layer HLG, and Layer A has retries:2 while Layer B has retries:4, we need to make sure that the unpacking keeps both of those annotations. The current implementation has a shallow merge, which makes later-unpacked layers overwrite the annotations of previously-unpacked layers. To make the above case work, we need a bit deeper of a merge.
For the same reason, I also think this logic needs to be one step above Layer.expand_annotations.
There was a problem hiding this comment.
Yes, you're right. Thanks for correcting my understanding.
new_annotations[k] = {**annotations[k], **v}may be faster than tlz.merge now that dask supports >= Python 3.6
There was a problem hiding this comment.
The functionality in _materialized_layer_unpack mirrors
Layer.__dask_distributed_unpack__and it all it's subclasses. They also need to expand and unpack annotations.
I think this is still an issue, we need to apply the same changes to Blockwise.__dask_distributed_unpack__ and SimpleShuffleLayer.__dask_distributed_unpack__.
Maybe it makes more sense to redesign the annotations by removing the annotations argument from Layer.__dask_distributed_unpack__ and then have highlevelgraph_unpack() expand and unpack annotations just after unpack_func(layer["state"], dsk, deps).
This will simplify all the __dask_distributed_unpack__ implementations and remove duplicate code. The only downside is that it limits Layers to do a specialized annotation expansion, which no one is doing currently. Unless I am missing something, I think that is a great trade-off.
There was a problem hiding this comment.
Thanks for the detailed discussion @sjperkins and @madsbk. I've tried to do something like a compromise solution in dask/dask#7102, and I'd love to hear your thoughts on it. It takes a somewhat minimal approach to API changes (so __dask_distributed_unpack__() still takes the annotations), and avoids further imports of distributed there. Overall the strategy I'm trying out is
- Move the default implementation of layer unpacking into
Layer. - Move the logic of doing a deeper merge on annotations into a new
Layer.merge_annotations, similar toLayer.expand_annotations. I am not overly attached to this division of responsibility, so if there are suggestions for how to make it cleaner, I'd be happy to hear them. - Ensure that
Layer,Blockwise, andShuffleLayerall unpack annotations properly (previously the latter two failed).
There was a problem hiding this comment.
LGTM @ian-r-rose but I think @sjperkins has a point, we should unify everything in Dask and change the API such that layers can delegate the annotation unpacking to Layer.__dask_distributed_unpack__().
I suggest that we split this work into multiple PRs. Let's rollback this PR to the state that didn't require any significant change to the API and get it merged. It greatly simplifies the code and is a win in all cases.
A followup PR can then modify the API based on the design thoughts we have been discussing here. I will be happy to initiate the followup PR, I think I understand @sjperkins's design ideas and concerns now :)
There was a problem hiding this comment.
Thanks for the discussion @madsbk and @ian-r-rose . I agree with moving Layer API changes forward to a new PR, so don't let my concerns hold this one up. LGTM
There was a problem hiding this comment.
@madsbk I'd be happy to have this be across multiple PRs. Can you specify what API changes you would want to roll back? This now depends on dask/dask#7102, but that is somewhat more of a bugfix than a major API change.
I could bring the default implementation of Layer.__dask_distributed_unpack__() back here and then xfail the test that catches dask/dask#7102 , if that's what you mean.
There was a problem hiding this comment.
Sorry my bad, I thought you changed the API in the last commit. I am totally fine with this PR+dask/dask#7102
| mod = import_allowed_module(layer["__module__"]) | ||
| unpack_func = getattr(mod, layer["__name__"]).__dask_distributed_unpack__ | ||
| unpack_func(layer["state"], dsk, deps, annotations) | ||
| unpack_func(layer["state"], dsk, deps, out_annotations) |
There was a problem hiding this comment.
To me this implies that Layer annotations always take precedence over the existing Client.{submit, persist} kwarg taxonomy. I think this makes sense in terms of backwards compatibility (specific annotations override global annotations)
|
@mrocklin You mentioned offline that passing in For simplicity's sake, this PR has tried to completely move such annotations to live at the HLG Layer level, and remove the older ways of doing it, including I've tried to avoid a proliferation such special cases in the pack/unpack logic. We could revisit that, but at least at the moment, retries/priority/workers/etc only take very simple non-graph objects. |
madsbk
left a comment
There was a problem hiding this comment.
Overall this look great @ian-r-rose but I think we should consider simplifying the annotation design in order avoid duplicate annotation implementations in each Layer (see comment).
| new_annotations[k] = merge(annotations[k], v) | ||
| else: | ||
| new_annotations[k] = v | ||
| annotations.update(new_annotations) |
There was a problem hiding this comment.
The functionality in _materialized_layer_unpack mirrors
Layer.__dask_distributed_unpack__and it all it's subclasses. They also need to expand and unpack annotations.
I think this is still an issue, we need to apply the same changes to Blockwise.__dask_distributed_unpack__ and SimpleShuffleLayer.__dask_distributed_unpack__.
Maybe it makes more sense to redesign the annotations by removing the annotations argument from Layer.__dask_distributed_unpack__ and then have highlevelgraph_unpack() expand and unpack annotations just after unpack_func(layer["state"], dsk, deps).
This will simplify all the __dask_distributed_unpack__ implementations and remove duplicate code. The only downside is that it limits Layers to do a specialized annotation expansion, which no one is doing currently. Unless I am missing something, I think that is a great trade-off.
Previously we had two systems to send per-task metadata like retries or
workers or priorities to the scheduler.
1. Older system with explicit workers= keywords and expand_foo functions
2. Newer system with annotations
The annotations system is nicer for a few reasons:
1. It's more generic
2. It's more consistent (there were some bugs in the expand foo
functions, especially when dealing with collections)
3. We ship values up on a per-layer basis rather than a per-key basis
This work-in-progress commit rips out the old system and uses the new system,
but it still missing a lot:
1. It only implements this for the Client.compute method.
We need to repeat this for persist, submit, and map
2. It doesn't handle the allow_other_workers -> loose_restrictions
conversion anywhere yet. (this will need to be added to the
scheduler)
rebase fail
side and pass them unmodified to the scheduler.
version of doing things.
compute/persist. These can still be accomplished in an annotation context.
999b04b to
066c00e
Compare
|
n.b., now that this depends on dask/dask#7102, tests are failing, but they should pass once that PR is resolved |
066c00e to
b98a1f9
Compare
|
How are we doing here? |
dask/dask#7102 (on which this now depends) has been merged, I think this is ready (test failures look unrelated to me) |
|
My sense is this is good for a final review and merge |
Note, I've been ignoring travis failures for a while now, for better or worse. |
jrbourbeau
left a comment
There was a problem hiding this comment.
Thanks for all your work on this @ian-r-rose! I've left a few comments below, but overall the changes here look great. Thanks for adding a bunch of tests.
We might also consider updating https://distributed.dask.org/en/latest/resources.html to reflect to new annotations approach for collections
distributed/tests/test_client.py
Outdated
| @pytest.mark.skipif( | ||
| not sys.platform.startswith("linux"), reason="Need 127.0.0.2 to mean localhost" | ||
| ) | ||
| @gen_cluster([("127.0.0.1", 1), ("127.0.0.2", 2)], client=True) | ||
| async def test_restrictions_get_annotate(c, s, a, b): |
There was a problem hiding this comment.
Since the workers= annotation also supports worker addresses in addition to worker hostnames, I think we can update this to remove the pytest.mark.skipif while testing the same behavior:
diff --git a/distributed/tests/test_client.py b/distributed/tests/test_client.py
index eb1c362d..0a271596 100644
--- a/distributed/tests/test_client.py
+++ b/distributed/tests/test_client.py
@@ -957,15 +957,12 @@ async def test_restrictions_get(c, s, a, b):
assert len(b.data) == 0
-@pytest.mark.skipif(
- not sys.platform.startswith("linux"), reason="Need 127.0.0.2 to mean localhost"
-)
-@gen_cluster([("127.0.0.1", 1), ("127.0.0.2", 2)], client=True)
+@gen_cluster(client=True)
async def test_restrictions_get_annotate(c, s, a, b):
x = 1
- with dask.annotate(workers=a.ip):
+ with dask.annotate(workers=a.address):
y = delayed(inc)(x)
- with dask.annotate(workers=b.ip):
+ with dask.annotate(workers=b.address):
z = delayed(inc)(y)
futures = c.get(z.__dask_graph__(), [y.key, z.key], sync=False)This is particularly nice given our current linux testing situation on Travis
|
|
||
| @nodebug # test timing is fragile | ||
| @gen_cluster(nthreads=[("127.0.0.1", 1)] * 3, client=True) | ||
| async def test_persist_workers(e, s, a, b, c): |
There was a problem hiding this comment.
Just for my own understanding, it looks like test_persist_workers and test_compute_workers have been updated to specifically not run tasks on a worker, which is earlier to specify with the workers= keyword, instead of running certain collections on certain workers. We're now covering the certain collections on certain workers functionality with new test_persist_workers_annotate and test_compute_workers_annotate tests. Is that correct?
There was a problem hiding this comment.
I've updated them to persist/compute on a given set of workers, but it is no longer possible to delegate certain tasks/keys to a specific worker in a persist/compute call. In order to do that, you now need to use the annotation framework.
We're now covering the certain collections on certain workers functionality with new
test_persist_workers_annotateandtest_compute_workers_annotatetests. Is that correct?
Correct
|
Just wanted to point out that the CI is starting to fail here (xref https://github.com/dask/distributed/pull/4467/checks?check_run_id=1786579413) because of the updates in dask/dask#7102. Merging this PR will resolve the AttributeError: type object 'Layer' has no attribute 'expand_annotations'issue |
jrbourbeau
left a comment
There was a problem hiding this comment.
Thanks @ian-r-rose for working on this and @sjperkins @madsbk @mrocklin for reviewing!
|
Thanks for the reviews, all! |
|
Thanks @ian-r-rose and congratulations on treading a path through all the discussion :-D |
Continues/supersedes #4347, using the task annotation machinery to specify workers/priority/retries/etc when submitting code to the scheduler. Fixes #4262