Conversation
|
This is a good catch @ian-r-rose. LGTM |
jrbourbeau
left a comment
There was a problem hiding this comment.
Thanks @ian-r-rose! Just a couple of small comments, otherwise the changes here LGTM
dask/blockwise.py
Outdated
| expanded = cls.expand_annotations(state["annotations"], raw.keys()) | ||
| cls.merge_annotations(annotations, expanded) |
There was a problem hiding this comment.
It's not a blocker for merging but with the changes here and in dask/distributed#4406 it looks like we never call expand_annotations without a subsequent call to merge_annotations. Should we combine them into a single method to avoid accidentally calling one without the other?
There was a problem hiding this comment.
Yeah, my thought had been to avoid the breaking API change (as noted here), but perhaps the danger of splitting the two functions up is the greater concern.
There was a problem hiding this comment.
I agree especially as this will be less of a problem once the API changes mentioned here are implemented: dask/distributed#4406 (comment). Annotations would be expanded (and merged) in Layer.dask_unpack_annotations and derived classes would defer to this implementation in the majority of cases.
There was a problem hiding this comment.
Okay, I've combined them into a single method
jrbourbeau
left a comment
There was a problem hiding this comment.
Thanks @ian-r-rose for the fix and @sjperkins for reviewing!
|
Thanks for the reviews @jrbourbeau and @sjperkins! |
black dask/flake8 daskThis is partner to dask/distributed#4406. The motivation there is to unify the implementation of control flow like workers/retries/priority around the newer annotations system. In that PR, we found that the final step of unpacking annotations on the scheduler needed to be a non-shallow merge so that constructing a final annotations structure would allow for different layers to have different annotations. For example, two layers could have
retries: { 'a' : 2 }andretries: { 'b': 3 }. With a shallow merge the latterretrieswould overwrite the former, when in fact we want a result ofretries: { 'a': 2, 'b': 3 }.This implements that deeper-merge for annotations in
__dask_distributed_unpack__(). It also moves the default unpacking function fromdistributedto this package, in order to be more consistent with where other layers have implemented that behavior.cc @sjperkins @madsbk