From 9e231814c99132befcb93aa90e6ef89f263cc492 Mon Sep 17 00:00:00 2001 From: Simon Perkins Date: Mon, 26 Oct 2020 19:27:10 +0200 Subject: [PATCH 01/14] Initial Implementation --- dask/__init__.py | 1 + dask/annotations.py | 39 +++++++++++++++++ dask/highlevelgraph.py | 83 +++++++++++++++++++++++++++++++++++- dask/tests/test_highgraph.py | 60 ++++++++++++++++++++++++++ 4 files changed, 182 insertions(+), 1 deletion(-) create mode 100644 dask/annotations.py diff --git a/dask/__init__.py b/dask/__init__.py index 41f65daa415..896a2605747 100644 --- a/dask/__init__.py +++ b/dask/__init__.py @@ -1,4 +1,5 @@ from . import config, datasets +from .annotations import annotate from .core import istask from .local import get_sync as get diff --git a/dask/annotations.py b/dask/annotations.py new file mode 100644 index 00000000000..444355c2d7e --- /dev/null +++ b/dask/annotations.py @@ -0,0 +1,39 @@ +from contextlib import contextmanager +import threading + + +class SafeStack: + def __init__(self): + super().__init__() + self._lock = threading.Lock() + self._stack = [] + + def push(self, annotation): + with self._lock: + self._stack.append(annotation) + + def pop(self): + with self._lock: + self._stack.pop() + + def copy(self): + with self._lock: + return self._stack.copy() + + +annotation_stack = SafeStack() + + +def current_annotations(): + return annotation_stack.copy() + + +@contextmanager +def annotate(annotation): + + annotation_stack.push(annotation) + + try: + yield + finally: + annotation_stack.pop() diff --git a/dask/highlevelgraph.py b/dask/highlevelgraph.py index 911a403f843..881e62fac1e 100644 --- a/dask/highlevelgraph.py +++ b/dask/highlevelgraph.py @@ -1,9 +1,11 @@ import collections.abc +from itertools import chain from typing import Callable, Hashable, Optional, Set, Mapping, Iterable, Tuple import copy import tlz as toolz +from .annotations import current_annotations from .utils import ignoring from .base import is_dask_collection from .core import reverse_dict, keys_in_tasks @@ -27,12 +29,87 @@ def _find_layer_containing_key(key): return ret +class LayerAnnotation(collections.abc.Mapping): + pass + + +class SingleLayerAnnotation(LayerAnnotation): + """ Applies a single annotation to all keys """ + + def __init__(self, annotation, keys): + self.annotation = annotation + self.keys = set(keys) + + def __contains__(self, k): + return k in self.keys + + def __getitem__(self, k): + return self.annotation + + def __iter__(self): + return iter(self.keys) + + def __len__(self): + return len(self.keys) + + def __reduce__(self): + return (SingleLayerAnnotation, (self.annotation, self.keys)) + + +class ExplicitLayerAnnotation(LayerAnnotation): + """ Wraps a dictionary of annotations """ + + def __init__(self, annotations): + self.annotations = annotations + + def __contains__(self, k): + return k in self.annotations + + def __getitem__(self, k): + return self.annotations[k] + + def __len__(self): + return len(self.annotations) + + def __iter__(self): + return iter(self.annotations) + + def __reduce__(self): + return (ExplicitLayerAnnotation, (self.annotations,)) + + +class MapLayerAnnotation(LayerAnnotation): + """ Encapsulate a function mapping keys to annotations """ + + def __init__(self, function: Callable, keys: Set): + self.function = function + self.keys = set(keys) + + def __contains__(self, k): + return k in self.keys + + def __getitem__(self, k): + return self.function(k) + + def __iter__(self): + return iter(self.keys) + + def __len__(self): + return len(self.keys) + + def __reduce__(self): + return (MapLayerAnnotation, (self.function, self.keys)) + + class Layer(collections.abc.Mapping): """High level graph layer - This abstract class establish a protocol for high level graph layers. + This abstract class establishes a protocol for high level graph layers. """ + def get_annotations(self) -> Mapping[Hashable, Mapping]: + return {} + def cull( self, keys: Set, all_hlg_keys: Iterable ) -> Tuple["Layer", Mapping[Hashable, Set]]: @@ -156,6 +233,7 @@ def __init__(self, mapping, dependencies=None, global_dependencies=None): self.dependencies = dependencies self.global_dependencies = global_dependencies self.global_dependencies_has_been_trimmed = False + self.annotations = current_annotations() def __contains__(self, k): return k in self.mapping @@ -169,6 +247,9 @@ def __iter__(self): def __len__(self): return len(self.mapping) + def get_annotations(self): + return chain.from_iterable((a.items() for a in self.annotations)) + def get_dependencies(self, key, all_hlg_keys): if self.dependencies is None or self.global_dependencies is None: return super().get_dependencies(key, all_hlg_keys) diff --git a/dask/tests/test_highgraph.py b/dask/tests/test_highgraph.py index 504ccd82c10..f550205d616 100644 --- a/dask/tests/test_highgraph.py +++ b/dask/tests/test_highgraph.py @@ -1,8 +1,10 @@ +import pickle from functools import partial import os import pytest +import dask import dask.array as da from dask.utils_test import inc from dask.highlevelgraph import HighLevelGraph, BasicLayer, Layer @@ -110,3 +112,61 @@ def plus_one(tasks): y.dask = dsk.map_tasks(plus_one) assert_eq(y, [42] * 3) + + +def test_single_annotation(): + from dask.highlevelgraph import SingleLayerAnnotation + + a = {"x": 1, "y": (inc, "x")} + + sa = SingleLayerAnnotation({"worker": "alice"}, set(a.keys())) + assert pickle.loads(pickle.dumps(sa)) == sa + with dask.annotate(sa): + layers = { + "a": BasicLayer( + a, dependencies={"x": set(), "y": {"x"}}, global_dependencies=set() + ) + } + + assert all(v == sa.annotation for _, v in layers["a"].get_annotations()) + + +def test_explicit_annotations(): + from dask.highlevelgraph import ExplicitLayerAnnotation + + a = {"x": 1, "y": (inc, "x")} + ea = ExplicitLayerAnnotation({"y": {"resource": "GPU"}, "x": {"worker": "alice"}}) + assert pickle.loads(pickle.dumps(ea)) == ea + + with dask.annotate(ea): + layers = { + "a": BasicLayer( + a, dependencies={"x": set(), "y": {"x"}}, global_dependencies=set() + ) + } + + assert dict(layers["a"].get_annotations()) == dict(ea) + + +def annot_map_fn(key): + return key[1:] + + +def test_mapped_annotations(): + from dask.highlevelgraph import MapLayerAnnotation + + a = {("x", 0): (inc, 0), ("x", 1): (inc, 1)} + ma = MapLayerAnnotation(annot_map_fn, set(a.keys())) + assert pickle.loads(pickle.dumps(ma)) == ma + + with dask.annotate(ma): + layers = { + "a": BasicLayer( + a, + dependencies={k: set() for k in a.keys()}, + global_dependencies=set(), + ) + } + + expected = dict((k, annot_map_fn(k)) for k in a.keys()) + assert dict(layers["a"].get_annotations()) == expected From c7d5d85bcb508528f1faf639eace5e4f5ef1e8ac Mon Sep 17 00:00:00 2001 From: Simon Perkins Date: Mon, 26 Oct 2020 21:12:08 +0200 Subject: [PATCH 02/14] Support short form layer annotations --- dask/annotations.py | 3 ++- dask/highlevelgraph.py | 36 +++++++++++++++++++++++++----------- dask/tests/test_highgraph.py | 24 ++++++++++++++++++------ 3 files changed, 45 insertions(+), 18 deletions(-) diff --git a/dask/annotations.py b/dask/annotations.py index 444355c2d7e..5516b5bd54f 100644 --- a/dask/annotations.py +++ b/dask/annotations.py @@ -3,6 +3,8 @@ class SafeStack: + """ Maintains thread safe stack of current annotations """ + def __init__(self): super().__init__() self._lock = threading.Lock() @@ -30,7 +32,6 @@ def current_annotations(): @contextmanager def annotate(annotation): - annotation_stack.push(annotation) try: diff --git a/dask/highlevelgraph.py b/dask/highlevelgraph.py index 881e62fac1e..baeb86059d9 100644 --- a/dask/highlevelgraph.py +++ b/dask/highlevelgraph.py @@ -37,23 +37,24 @@ class SingleLayerAnnotation(LayerAnnotation): """ Applies a single annotation to all keys """ def __init__(self, annotation, keys): + assert type(annotation) is dict self.annotation = annotation - self.keys = set(keys) + self.map_keys = set(keys) def __contains__(self, k): - return k in self.keys + return k in self.map_keys def __getitem__(self, k): return self.annotation def __iter__(self): - return iter(self.keys) + return iter(self.map_keys) def __len__(self): - return len(self.keys) + return len(self.map_keys) def __reduce__(self): - return (SingleLayerAnnotation, (self.annotation, self.keys)) + return (SingleLayerAnnotation, (self.annotation, self.map_keys)) class ExplicitLayerAnnotation(LayerAnnotation): @@ -83,22 +84,22 @@ class MapLayerAnnotation(LayerAnnotation): def __init__(self, function: Callable, keys: Set): self.function = function - self.keys = set(keys) + self.map_keys = set(keys) def __contains__(self, k): - return k in self.keys + return k in self.map_keys def __getitem__(self, k): return self.function(k) def __iter__(self): - return iter(self.keys) + return iter(self.map_keys) def __len__(self): - return len(self.keys) + return len(self.map_keys) def __reduce__(self): - return (MapLayerAnnotation, (self.function, self.keys)) + return (MapLayerAnnotation, (self.function, self.map_keys)) class Layer(collections.abc.Mapping): @@ -233,7 +234,20 @@ def __init__(self, mapping, dependencies=None, global_dependencies=None): self.dependencies = dependencies self.global_dependencies = global_dependencies self.global_dependencies_has_been_trimmed = False - self.annotations = current_annotations() + + annotations = current_annotations() + + for i, a in enumerate(annotations): + if isinstance(a, LayerAnnotation): + continue + elif callable(a): + annotations[i] = MapLayerAnnotation(a, mapping.keys()) + elif type(a) is dict: + annotations[i] = SingleLayerAnnotation(a, mapping.keys()) + else: + raise TypeError(f"{type(a)} must be LayerAnnotation, callable or dict") + + self.annotations = annotations def __contains__(self, k): return k in self.mapping diff --git a/dask/tests/test_highgraph.py b/dask/tests/test_highgraph.py index f550205d616..09f3b021be4 100644 --- a/dask/tests/test_highgraph.py +++ b/dask/tests/test_highgraph.py @@ -114,12 +114,18 @@ def plus_one(tasks): assert_eq(y, [42] * 3) -def test_single_annotation(): +@pytest.mark.parametrize("short_form", [True, False]) +def test_single_annotation(short_form): from dask.highlevelgraph import SingleLayerAnnotation a = {"x": 1, "y": (inc, "x")} + annotation = {"worker": "alice"} + + if not short_form: + sa = SingleLayerAnnotation(annotation, set(a.keys())) + else: + sa = annotation - sa = SingleLayerAnnotation({"worker": "alice"}, set(a.keys())) assert pickle.loads(pickle.dumps(sa)) == sa with dask.annotate(sa): layers = { @@ -128,14 +134,15 @@ def test_single_annotation(): ) } - assert all(v == sa.annotation for _, v in layers["a"].get_annotations()) + assert all(v == annotation for _, v in layers["a"].get_annotations()) def test_explicit_annotations(): from dask.highlevelgraph import ExplicitLayerAnnotation a = {"x": 1, "y": (inc, "x")} - ea = ExplicitLayerAnnotation({"y": {"resource": "GPU"}, "x": {"worker": "alice"}}) + ea = {"y": {"resource": "GPU"}, "x": {"worker": "alice"}} + ea = ExplicitLayerAnnotation(ea) assert pickle.loads(pickle.dumps(ea)) == ea with dask.annotate(ea): @@ -152,11 +159,16 @@ def annot_map_fn(key): return key[1:] -def test_mapped_annotations(): +@pytest.mark.parametrize("short_form", [True, False]) +def test_mapped_annotations(short_form): from dask.highlevelgraph import MapLayerAnnotation a = {("x", 0): (inc, 0), ("x", 1): (inc, 1)} - ma = MapLayerAnnotation(annot_map_fn, set(a.keys())) + ma = annot_map_fn + + if not short_form: + ma = MapLayerAnnotation(annot_map_fn, set(a.keys())) + assert pickle.loads(pickle.dumps(ma)) == ma with dask.annotate(ma): From b486d06f32f17d375631dc9347ce98ff9edc829e Mon Sep 17 00:00:00 2001 From: Simon Perkins Date: Tue, 27 Oct 2020 18:01:25 +0200 Subject: [PATCH 03/14] Dict key views, warn on MapLayerAnnotations * Pass Mapping key views into LayerAnnotations * Convert LayerAnnotations sets to keys during pickling * Warn on MapLayerAnnotations, due to remote execution issues. --- dask/highlevelgraph.py | 19 +++++++++++------ dask/tests/test_highgraph.py | 40 ++++++++++++++++++++++++++++-------- 2 files changed, 45 insertions(+), 14 deletions(-) diff --git a/dask/highlevelgraph.py b/dask/highlevelgraph.py index baeb86059d9..fff827da9a6 100644 --- a/dask/highlevelgraph.py +++ b/dask/highlevelgraph.py @@ -2,6 +2,7 @@ from itertools import chain from typing import Callable, Hashable, Optional, Set, Mapping, Iterable, Tuple import copy +import warnings import tlz as toolz @@ -39,7 +40,7 @@ class SingleLayerAnnotation(LayerAnnotation): def __init__(self, annotation, keys): assert type(annotation) is dict self.annotation = annotation - self.map_keys = set(keys) + self.map_keys = keys def __contains__(self, k): return k in self.map_keys @@ -54,7 +55,7 @@ def __len__(self): return len(self.map_keys) def __reduce__(self): - return (SingleLayerAnnotation, (self.annotation, self.map_keys)) + return (SingleLayerAnnotation, (self.annotation, set(self.map_keys))) class ExplicitLayerAnnotation(LayerAnnotation): @@ -82,9 +83,15 @@ def __reduce__(self): class MapLayerAnnotation(LayerAnnotation): """ Encapsulate a function mapping keys to annotations """ - def __init__(self, function: Callable, keys: Set): + def __init__(self, function: Callable, keys): self.function = function - self.map_keys = set(keys) + self.map_keys = keys + warnings.warn( + "Marked for deprecation as we don't want " + "to pickle functions for remote execution " + "on the distributed scheduler. Reify and " + "use ExplicitLayerAnnotation instead" + ) def __contains__(self, k): return k in self.map_keys @@ -99,7 +106,7 @@ def __len__(self): return len(self.map_keys) def __reduce__(self): - return (MapLayerAnnotation, (self.function, self.map_keys)) + return (MapLayerAnnotation, (self.function, set(self.map_keys))) class Layer(collections.abc.Mapping): @@ -241,7 +248,7 @@ def __init__(self, mapping, dependencies=None, global_dependencies=None): if isinstance(a, LayerAnnotation): continue elif callable(a): - annotations[i] = MapLayerAnnotation(a, mapping.keys()) + annotations[i] = ExplicitLayerAnnotation({k: a(k) for k in mapping}) elif type(a) is dict: annotations[i] = SingleLayerAnnotation(a, mapping.keys()) else: diff --git a/dask/tests/test_highgraph.py b/dask/tests/test_highgraph.py index 09f3b021be4..2e26e8f8d7c 100644 --- a/dask/tests/test_highgraph.py +++ b/dask/tests/test_highgraph.py @@ -122,7 +122,7 @@ def test_single_annotation(short_form): annotation = {"worker": "alice"} if not short_form: - sa = SingleLayerAnnotation(annotation, set(a.keys())) + sa = SingleLayerAnnotation(annotation, a.keys()) else: sa = annotation @@ -156,19 +156,15 @@ def test_explicit_annotations(): def annot_map_fn(key): - return key[1:] + return {"block_id": key[1:]} -@pytest.mark.parametrize("short_form", [True, False]) -def test_mapped_annotations(short_form): - from dask.highlevelgraph import MapLayerAnnotation +def test_mapped_annotations_short_form(): + from dask.highlevelgraph import ExplicitLayerAnnotation a = {("x", 0): (inc, 0), ("x", 1): (inc, 1)} ma = annot_map_fn - if not short_form: - ma = MapLayerAnnotation(annot_map_fn, set(a.keys())) - assert pickle.loads(pickle.dumps(ma)) == ma with dask.annotate(ma): @@ -180,5 +176,33 @@ def test_mapped_annotations(short_form): ) } + assert isinstance(layers["a"].annotations[0], ExplicitLayerAnnotation) + + expected = dict((k, annot_map_fn(k)) for k in a.keys()) + assert dict(layers["a"].get_annotations()) == expected + + +def test_mapped_annotations_long_form(): + from dask.highlevelgraph import MapLayerAnnotation + + a = {("x", 0): (inc, 0), ("x", 1): (inc, 1)} + + with pytest.warns(UserWarning): + ma = MapLayerAnnotation(annot_map_fn, a.keys()) + + with pytest.warns(UserWarning): + assert pickle.loads(pickle.dumps(ma)) == ma + + with dask.annotate(ma): + layers = { + "a": BasicLayer( + a, + dependencies={k: set() for k in a.keys()}, + global_dependencies=set(), + ) + } + + assert isinstance(layers["a"].annotations[0], MapLayerAnnotation) + expected = dict((k, annot_map_fn(k)) for k in a.keys()) assert dict(layers["a"].get_annotations()) == expected From 8bd10c397a74324e9a8dc4844503ad5e183fefa2 Mon Sep 17 00:00:00 2001 From: Simon Perkins Date: Tue, 27 Oct 2020 18:36:13 +0200 Subject: [PATCH 04/14] Correectly handle multiple annotations --- dask/highlevelgraph.py | 9 ++++++++- dask/tests/test_highgraph.py | 17 +++++++++++++++++ 2 files changed, 25 insertions(+), 1 deletion(-) diff --git a/dask/highlevelgraph.py b/dask/highlevelgraph.py index fff827da9a6..a20f9f4b995 100644 --- a/dask/highlevelgraph.py +++ b/dask/highlevelgraph.py @@ -1,3 +1,4 @@ +from collections import defaultdict import collections.abc from itertools import chain from typing import Callable, Hashable, Optional, Set, Mapping, Iterable, Tuple @@ -269,7 +270,13 @@ def __len__(self): return len(self.mapping) def get_annotations(self): - return chain.from_iterable((a.items() for a in self.annotations)) + annotations = defaultdict(dict) + + for a in self.annotations: + for k, v in a.items(): + annotations[k].update(v) + + return annotations def get_dependencies(self, key, all_hlg_keys): if self.dependencies is None or self.global_dependencies is None: diff --git a/dask/tests/test_highgraph.py b/dask/tests/test_highgraph.py index 2e26e8f8d7c..d36748ad28f 100644 --- a/dask/tests/test_highgraph.py +++ b/dask/tests/test_highgraph.py @@ -206,3 +206,20 @@ def test_mapped_annotations_long_form(): expected = dict((k, annot_map_fn(k)) for k in a.keys()) assert dict(layers["a"].get_annotations()) == expected + + +def test_multiple_annotations(): + a = {("x", 0): (inc, 0), ("x", 1): (inc, 1)} + + with dask.annotate(annot_map_fn): + with dask.annotate({"resource": "GPU"}): + layers = { + "a": BasicLayer( + a, + dependencies={k: set() for k in a.keys()}, + global_dependencies=set(), + ) + } + + expected = {k: {"block_id": k[1:], "resource": "GPU"} for k in a.keys()} + assert dict(layers["a"].get_annotations()) == expected From 044072bad3d3a5bd43d7406ffef128e6d6c35b97 Mon Sep 17 00:00:00 2001 From: Simon Perkins Date: Tue, 27 Oct 2020 18:40:57 +0200 Subject: [PATCH 05/14] flake8 --- dask/highlevelgraph.py | 1 - 1 file changed, 1 deletion(-) diff --git a/dask/highlevelgraph.py b/dask/highlevelgraph.py index a20f9f4b995..436086bfae8 100644 --- a/dask/highlevelgraph.py +++ b/dask/highlevelgraph.py @@ -1,6 +1,5 @@ from collections import defaultdict import collections.abc -from itertools import chain from typing import Callable, Hashable, Optional, Set, Mapping, Iterable, Tuple import copy import warnings From 3b07f3471d5fae4de2b06e3161e9247f25fb7a31 Mon Sep 17 00:00:00 2001 From: Simon Perkins Date: Tue, 27 Oct 2020 19:28:03 +0200 Subject: [PATCH 06/14] Fix dict iteration --- dask/tests/test_highgraph.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dask/tests/test_highgraph.py b/dask/tests/test_highgraph.py index d36748ad28f..3f5d44b0fb6 100644 --- a/dask/tests/test_highgraph.py +++ b/dask/tests/test_highgraph.py @@ -134,7 +134,7 @@ def test_single_annotation(short_form): ) } - assert all(v == annotation for _, v in layers["a"].get_annotations()) + assert all(v == annotation for v in layers["a"].get_annotations().values()) def test_explicit_annotations(): From 13887df648d8332d3a3c15f8dfdde8652fa4b01f Mon Sep 17 00:00:00 2001 From: Simon Perkins Date: Wed, 4 Nov 2020 09:14:44 +0200 Subject: [PATCH 07/14] Rework tests --- dask/highlevelgraph.py | 13 ++++ dask/tests/test_highgraph.py | 127 ++++++++++------------------------- 2 files changed, 48 insertions(+), 92 deletions(-) diff --git a/dask/highlevelgraph.py b/dask/highlevelgraph.py index 436086bfae8..5ae0a368800 100644 --- a/dask/highlevelgraph.py +++ b/dask/highlevelgraph.py @@ -48,6 +48,13 @@ def __contains__(self, k): def __getitem__(self, k): return self.annotation + def __eq__(self, other): + return ( + type(other) is SingleLayerAnnotation + and self.annotation == other.annotation + and self.map_keys == other.map_keys + ) + def __iter__(self): return iter(self.map_keys) @@ -70,6 +77,12 @@ def __contains__(self, k): def __getitem__(self, k): return self.annotations[k] + def __eq__(self, other): + return ( + type(other) is ExplicitLayerAnnotation + and self.annotations == other.annotations + ) + def __len__(self): return len(self.annotations) diff --git a/dask/tests/test_highgraph.py b/dask/tests/test_highgraph.py index 3f5d44b0fb6..19fc2a048f4 100644 --- a/dask/tests/test_highgraph.py +++ b/dask/tests/test_highgraph.py @@ -1,3 +1,4 @@ +from collections import defaultdict import pickle from functools import partial import os @@ -6,8 +7,10 @@ import dask import dask.array as da +from dask.core import flatten from dask.utils_test import inc from dask.highlevelgraph import HighLevelGraph, BasicLayer, Layer +from dask.highlevelgraph import ExplicitLayerAnnotation, SingleLayerAnnotation from dask.blockwise import Blockwise from dask.array.utils import assert_eq @@ -114,112 +117,52 @@ def plus_one(tasks): assert_eq(y, [42] * 3) -@pytest.mark.parametrize("short_form", [True, False]) -def test_single_annotation(short_form): - from dask.highlevelgraph import SingleLayerAnnotation - - a = {"x": 1, "y": (inc, "x")} +def test_single_annotation(): annotation = {"worker": "alice"} - if not short_form: - sa = SingleLayerAnnotation(annotation, a.keys()) - else: - sa = annotation - - assert pickle.loads(pickle.dumps(sa)) == sa - with dask.annotate(sa): - layers = { - "a": BasicLayer( - a, dependencies={"x": set(), "y": {"x"}}, global_dependencies=set() - ) - } - - assert all(v == annotation for v in layers["a"].get_annotations().values()) - - -def test_explicit_annotations(): - from dask.highlevelgraph import ExplicitLayerAnnotation + with dask.annotate(annotation): + A = da.ones((10, 10), chunks=(5, 5)) - a = {"x": 1, "y": (inc, "x")} - ea = {"y": {"resource": "GPU"}, "x": {"worker": "alice"}} - ea = ExplicitLayerAnnotation(ea) - assert pickle.loads(pickle.dumps(ea)) == ea - - with dask.annotate(ea): - layers = { - "a": BasicLayer( - a, dependencies={"x": set(), "y": {"x"}}, global_dependencies=set() - ) - } - - assert dict(layers["a"].get_annotations()) == dict(ea) + expected = SingleLayerAnnotation(annotation, set(flatten(A.__dask_keys__()))) + alayer = A.__dask_graph__().layers[A.name] + assert expected == pickle.loads(pickle.dumps(expected)) + assert alayer.annotations == [expected] def annot_map_fn(key): return {"block_id": key[1:]} -def test_mapped_annotations_short_form(): - from dask.highlevelgraph import ExplicitLayerAnnotation - - a = {("x", 0): (inc, 0), ("x", 1): (inc, 1)} - ma = annot_map_fn - - assert pickle.loads(pickle.dumps(ma)) == ma - - with dask.annotate(ma): - layers = { - "a": BasicLayer( - a, - dependencies={k: set() for k in a.keys()}, - global_dependencies=set(), - ) - } - - assert isinstance(layers["a"].annotations[0], ExplicitLayerAnnotation) - - expected = dict((k, annot_map_fn(k)) for k in a.keys()) - assert dict(layers["a"].get_annotations()) == expected - - -def test_mapped_annotations_long_form(): - from dask.highlevelgraph import MapLayerAnnotation - - a = {("x", 0): (inc, 0), ("x", 1): (inc, 1)} +def test_mapped_annotations(): + with dask.annotate(annot_map_fn): + A = da.ones((10, 10), chunks=(5, 5)) - with pytest.warns(UserWarning): - ma = MapLayerAnnotation(annot_map_fn, a.keys()) + expected = ExplicitLayerAnnotation( + {k: annot_map_fn(k) for k in flatten(A.__dask_keys__())} + ) + alayer = A.__dask_graph__().layers[A.name] + assert expected == pickle.loads(pickle.dumps(expected)) + assert alayer.annotations == [expected] - with pytest.warns(UserWarning): - assert pickle.loads(pickle.dumps(ma)) == ma - with dask.annotate(ma): - layers = { - "a": BasicLayer( - a, - dependencies={k: set() for k in a.keys()}, - global_dependencies=set(), - ) - } +def test_multiple_annotations(): + with dask.annotate(annot_map_fn): + with dask.annotate({"resource": "GPU"}): + A = da.ones((10, 10), chunks=(5, 5)) - assert isinstance(layers["a"].annotations[0], MapLayerAnnotation) + # akeys = set(flatten(A.__dask_keys__())) + alayer = A.__dask_graph__().layers[A.name] + akeys = alayer.keys() + explicit = ExplicitLayerAnnotation({k: {"block_id": k[1:]} for k in akeys}) + single = SingleLayerAnnotation({"resource": "GPU"}, akeys) + assert alayer.annotations == [explicit, single] - expected = dict((k, annot_map_fn(k)) for k in a.keys()) - assert dict(layers["a"].get_annotations()) == expected + expected = defaultdict(dict) + for k, v in explicit.items(): + expected[k].update(v) -def test_multiple_annotations(): - a = {("x", 0): (inc, 0), ("x", 1): (inc, 1)} + for k, v in single.items(): + expected[k].update(v) - with dask.annotate(annot_map_fn): - with dask.annotate({"resource": "GPU"}): - layers = { - "a": BasicLayer( - a, - dependencies={k: set() for k in a.keys()}, - global_dependencies=set(), - ) - } - - expected = {k: {"block_id": k[1:], "resource": "GPU"} for k in a.keys()} - assert dict(layers["a"].get_annotations()) == expected + assert alayer.get_annotations() == expected From d40b4422ace81fec38f64714212bd0f8cd94e95b Mon Sep 17 00:00:00 2001 From: Simon Perkins Date: Wed, 4 Nov 2020 18:50:25 +0200 Subject: [PATCH 08/14] Simplify annotations --- dask/__init__.py | 5 +- dask/annotations.py | 40 ------------ dask/core.py | 14 ++++ dask/highlevelgraph.py | 123 +---------------------------------- dask/tests/test_highgraph.py | 61 +++++------------ 5 files changed, 38 insertions(+), 205 deletions(-) delete mode 100644 dask/annotations.py diff --git a/dask/__init__.py b/dask/__init__.py index 896a2605747..aa5699761ac 100644 --- a/dask/__init__.py +++ b/dask/__init__.py @@ -1,6 +1,7 @@ from . import config, datasets -from .annotations import annotate -from .core import istask + +# from .annotations import annotate +from .core import istask, annotate from .local import get_sync as get try: diff --git a/dask/annotations.py b/dask/annotations.py deleted file mode 100644 index 5516b5bd54f..00000000000 --- a/dask/annotations.py +++ /dev/null @@ -1,40 +0,0 @@ -from contextlib import contextmanager -import threading - - -class SafeStack: - """ Maintains thread safe stack of current annotations """ - - def __init__(self): - super().__init__() - self._lock = threading.Lock() - self._stack = [] - - def push(self, annotation): - with self._lock: - self._stack.append(annotation) - - def pop(self): - with self._lock: - self._stack.pop() - - def copy(self): - with self._lock: - return self._stack.copy() - - -annotation_stack = SafeStack() - - -def current_annotations(): - return annotation_stack.copy() - - -@contextmanager -def annotate(annotation): - annotation_stack.push(annotation) - - try: - yield - finally: - annotation_stack.pop() diff --git a/dask/core.py b/dask/core.py index 8fb998afeb4..03225641684 100644 --- a/dask/core.py +++ b/dask/core.py @@ -1,10 +1,24 @@ from collections import defaultdict +from contextlib import contextmanager +from .config import get as config_get, set as config_set from .utils_test import add, inc # noqa: F401 no_default = "__no_default__" +@contextmanager +def annotate(**kwargs): + prev_annotations = config_get("annotations", {}) + annotations = { + **prev_annotations, + **{f"annotations.{k}": v for k, v in kwargs.items()}, + } + + with config_set(annotations): + yield + + def ishashable(x): """Is x hashable? diff --git a/dask/highlevelgraph.py b/dask/highlevelgraph.py index e0f75e755ff..dcc71bfe6d4 100644 --- a/dask/highlevelgraph.py +++ b/dask/highlevelgraph.py @@ -1,4 +1,3 @@ -from collections import defaultdict import abc import collections.abc from typing import ( @@ -13,11 +12,10 @@ Tuple, ) import copy -import warnings import tlz as toolz -from .annotations import current_annotations +from .config import get as config_get from .utils import ignoring from .base import is_dask_collection from .core import reverse_dict, keys_in_tasks @@ -41,98 +39,6 @@ def _find_layer_containing_key(key): return ret -class LayerAnnotation(collections.abc.Mapping): - pass - - -class SingleLayerAnnotation(LayerAnnotation): - """ Applies a single annotation to all keys """ - - def __init__(self, annotation, keys): - assert type(annotation) is dict - self.annotation = annotation - self.map_keys = keys - - def __contains__(self, k): - return k in self.map_keys - - def __getitem__(self, k): - return self.annotation - - def __eq__(self, other): - return ( - type(other) is SingleLayerAnnotation - and self.annotation == other.annotation - and self.map_keys == other.map_keys - ) - - def __iter__(self): - return iter(self.map_keys) - - def __len__(self): - return len(self.map_keys) - - def __reduce__(self): - return (SingleLayerAnnotation, (self.annotation, set(self.map_keys))) - - -class ExplicitLayerAnnotation(LayerAnnotation): - """ Wraps a dictionary of annotations """ - - def __init__(self, annotations): - self.annotations = annotations - - def __contains__(self, k): - return k in self.annotations - - def __getitem__(self, k): - return self.annotations[k] - - def __eq__(self, other): - return ( - type(other) is ExplicitLayerAnnotation - and self.annotations == other.annotations - ) - - def __len__(self): - return len(self.annotations) - - def __iter__(self): - return iter(self.annotations) - - def __reduce__(self): - return (ExplicitLayerAnnotation, (self.annotations,)) - - -class MapLayerAnnotation(LayerAnnotation): - """ Encapsulate a function mapping keys to annotations """ - - def __init__(self, function: Callable, keys): - self.function = function - self.map_keys = keys - warnings.warn( - "Marked for deprecation as we don't want " - "to pickle functions for remote execution " - "on the distributed scheduler. Reify and " - "use ExplicitLayerAnnotation instead" - ) - - def __contains__(self, k): - return k in self.map_keys - - def __getitem__(self, k): - return self.function(k) - - def __iter__(self): - return iter(self.map_keys) - - def __len__(self): - return len(self.map_keys) - - def __reduce__(self): - return (MapLayerAnnotation, (self.function, set(self.map_keys))) - - class Layer(collections.abc.Mapping): """High level graph layer @@ -154,9 +60,6 @@ def is_materialized(self) -> bool: """Return whether the layer is materialized or not""" return True - def get_annotations(self) -> Mapping[Hashable, Mapping]: - return {} - def get_output_keys(self) -> Set: """Return a set of all output keys @@ -349,19 +252,8 @@ def __init__(self, mapping, dependencies=None, global_dependencies=None): self.global_dependencies = global_dependencies self.global_dependencies_has_been_trimmed = False - annotations = current_annotations() - - for i, a in enumerate(annotations): - if isinstance(a, LayerAnnotation): - continue - elif callable(a): - annotations[i] = ExplicitLayerAnnotation({k: a(k) for k in mapping}) - elif type(a) is dict: - annotations[i] = SingleLayerAnnotation(a, mapping.keys()) - else: - raise TypeError(f"{type(a)} must be LayerAnnotation, callable or dict") - - self.annotations = annotations + annotations = config_get("annotations", None) + self.annotations = None if annotations is None else annotations.copy() def __contains__(self, k): return k in self.mapping @@ -375,15 +267,6 @@ def __iter__(self): def __len__(self): return len(self.mapping) - def get_annotations(self): - annotations = defaultdict(dict) - - for a in self.annotations: - for k, v in a.items(): - annotations[k].update(v) - - return annotations - def is_materialized(self): return True diff --git a/dask/tests/test_highgraph.py b/dask/tests/test_highgraph.py index 19fc2a048f4..2d6ff8dba03 100644 --- a/dask/tests/test_highgraph.py +++ b/dask/tests/test_highgraph.py @@ -1,5 +1,3 @@ -from collections import defaultdict -import pickle from functools import partial import os @@ -7,10 +5,8 @@ import dask import dask.array as da -from dask.core import flatten from dask.utils_test import inc from dask.highlevelgraph import HighLevelGraph, BasicLayer, Layer -from dask.highlevelgraph import ExplicitLayerAnnotation, SingleLayerAnnotation from dask.blockwise import Blockwise from dask.array.utils import assert_eq @@ -117,52 +113,31 @@ def plus_one(tasks): assert_eq(y, [42] * 3) -def test_single_annotation(): - annotation = {"worker": "alice"} - - with dask.annotate(annotation): - A = da.ones((10, 10), chunks=(5, 5)) - - expected = SingleLayerAnnotation(annotation, set(flatten(A.__dask_keys__()))) - alayer = A.__dask_graph__().layers[A.name] - assert expected == pickle.loads(pickle.dumps(expected)) - assert alayer.annotations == [expected] - - def annot_map_fn(key): - return {"block_id": key[1:]} - - -def test_mapped_annotations(): - with dask.annotate(annot_map_fn): + return key[1:] + + +@pytest.mark.parametrize( + "annotation", + [ + {"worker": "alice"}, + {"block_id": annot_map_fn}, + ], +) +def test_single_annotation(annotation): + with dask.annotate(**annotation): A = da.ones((10, 10), chunks=(5, 5)) - expected = ExplicitLayerAnnotation( - {k: annot_map_fn(k) for k in flatten(A.__dask_keys__())} - ) alayer = A.__dask_graph__().layers[A.name] - assert expected == pickle.loads(pickle.dumps(expected)) - assert alayer.annotations == [expected] + assert alayer.annotations == annotation + assert dask.config.get("annotations", None) is None def test_multiple_annotations(): - with dask.annotate(annot_map_fn): - with dask.annotate({"resource": "GPU"}): + with dask.annotate(block_id=annot_map_fn): + with dask.annotate(resource="GPU"): A = da.ones((10, 10), chunks=(5, 5)) - # akeys = set(flatten(A.__dask_keys__())) alayer = A.__dask_graph__().layers[A.name] - akeys = alayer.keys() - explicit = ExplicitLayerAnnotation({k: {"block_id": k[1:]} for k in akeys}) - single = SingleLayerAnnotation({"resource": "GPU"}, akeys) - assert alayer.annotations == [explicit, single] - - expected = defaultdict(dict) - - for k, v in explicit.items(): - expected[k].update(v) - - for k, v in single.items(): - expected[k].update(v) - - assert alayer.get_annotations() == expected + assert alayer.annotations == {"resource": "GPU", "block_id": annot_map_fn} + assert dask.config.get("annotations", None) is None From a42436cd993237ba1af7fb67c76fe7ea90b1704f Mon Sep 17 00:00:00 2001 From: Simon Perkins Date: Wed, 4 Nov 2020 19:44:44 +0200 Subject: [PATCH 09/14] Address review comments --- dask/__init__.py | 12 ++++++--- dask/base.py | 49 +++++++++++++++++++++++++++++++++++- dask/core.py | 14 ----------- dask/highlevelgraph.py | 4 +-- dask/tests/test_highgraph.py | 5 ++++ docs/source/api.rst | 1 + 6 files changed, 65 insertions(+), 20 deletions(-) diff --git a/dask/__init__.py b/dask/__init__.py index aa5699761ac..aec1a776f62 100644 --- a/dask/__init__.py +++ b/dask/__init__.py @@ -1,7 +1,6 @@ from . import config, datasets -# from .annotations import annotate -from .core import istask, annotate +from .core import istask from .local import get_sync as get try: @@ -9,7 +8,14 @@ except ImportError: pass try: - from .base import visualize, compute, persist, optimize, is_dask_collection + from .base import ( + visualize, + annotate, + compute, + persist, + optimize, + is_dask_collection, + ) except ImportError: pass diff --git a/dask/base.py b/dask/base.py index 49961d70537..9a27682ba91 100644 --- a/dask/base.py +++ b/dask/base.py @@ -1,5 +1,6 @@ from collections import OrderedDict from collections.abc import Mapping, Iterator +from contextlib import contextmanager from functools import partial from hashlib import md5 from operator import getitem @@ -23,6 +24,7 @@ __all__ = ( "DaskMethodsMixin", + "annotate", "is_dask_collection", "compute", "persist", @@ -33,6 +35,51 @@ ) +@contextmanager +def annotate(**annotations): + """Content Manager for setting HighLevelGraph Layer annotations. + + Annotations are metadata or soft constraints associated with + tasks that dask schedulers may choose to respect: They signal intent + without enforcing hard constraints. As such, they are + primarily designed for use with the distributed scheduler. + + Almost any object can serve as an annotation, but small Python objects + are preferred, while large objects such as NumPy arrays should be discouraged. + + Callables supplied as an annotation should take a single *key* argument and + produce the appropriate annotation. Individual task keys in the annotated collection + are supplied to the callable. + + Parameters + ---------- + **annotations : key-value pairs + + Examples + -------- + + All tasks within array A should have priority 100 and be retried 3 times + on failure. + >>> with dask.annotate(priority=100, retries=3): + >>> A = da.ones((10000, 10000)) + + Prioritise tasks within Array A on flattened block ID. + + >>> nblocks = (10, 10) + >>> with dask.annotate(priority=lambda k: k[1]*nblocks[1] + k[2]): + >>> A = da.ones((1000, 1000), chunks=(100, 100)) + """ + + prev_annotations = config.get("annotations", {}) + new_annotations = { + **prev_annotations, + **{f"annotations.{k}": v for k, v in annotations.items()}, + } + + with config.set(new_annotations): + yield + + def is_dask_collection(x): """Returns ``True`` if ``x`` is a dask collection""" try: @@ -95,7 +142,7 @@ def visualize(self, filename="mydask", format=None, optimize_graph=False, **kwar filename=filename, format=format, optimize_graph=optimize_graph, - **kwargs + **kwargs, ) def persist(self, **kwargs): diff --git a/dask/core.py b/dask/core.py index 03225641684..8fb998afeb4 100644 --- a/dask/core.py +++ b/dask/core.py @@ -1,24 +1,10 @@ from collections import defaultdict -from contextlib import contextmanager -from .config import get as config_get, set as config_set from .utils_test import add, inc # noqa: F401 no_default = "__no_default__" -@contextmanager -def annotate(**kwargs): - prev_annotations = config_get("annotations", {}) - annotations = { - **prev_annotations, - **{f"annotations.{k}": v for k, v in kwargs.items()}, - } - - with config_set(annotations): - yield - - def ishashable(x): """Is x hashable? diff --git a/dask/highlevelgraph.py b/dask/highlevelgraph.py index dcc71bfe6d4..2ab6abe64d0 100644 --- a/dask/highlevelgraph.py +++ b/dask/highlevelgraph.py @@ -15,7 +15,7 @@ import tlz as toolz -from .config import get as config_get +from . import config from .utils import ignoring from .base import is_dask_collection from .core import reverse_dict, keys_in_tasks @@ -252,7 +252,7 @@ def __init__(self, mapping, dependencies=None, global_dependencies=None): self.global_dependencies = global_dependencies self.global_dependencies_has_been_trimmed = False - annotations = config_get("annotations", None) + annotations = config.get("annotations", None) self.annotations = None if annotations is None else annotations.copy() def __contains__(self, k): diff --git a/dask/tests/test_highgraph.py b/dask/tests/test_highgraph.py index 2d6ff8dba03..9d8106bfd1a 100644 --- a/dask/tests/test_highgraph.py +++ b/dask/tests/test_highgraph.py @@ -133,6 +133,11 @@ def test_single_annotation(annotation): assert dask.config.get("annotations", None) is None +def test_no_annotation(): + A = da.ones((10, 10), chunks=(5, 5)) + assert A.__dask_graph__().layers[A.name].annotations is None + + def test_multiple_annotations(): with dask.annotate(block_id=annot_map_fn): with dask.annotate(resource="GPU"): diff --git a/docs/source/api.rst b/docs/source/api.rst index af0a6cce425..9362b8adfee 100644 --- a/docs/source/api.rst +++ b/docs/source/api.rst @@ -46,6 +46,7 @@ real-time or advanced operation. This more advanced API is available in the `Dask distributed documentation `_ +.. autofunciton:: annotate .. autofunction:: compute .. autofunction:: is_dask_collection .. autofunction:: optimize From 0968ad7a8c889b49be6579161311646612927ba1 Mon Sep 17 00:00:00 2001 From: Simon Perkins Date: Wed, 4 Nov 2020 20:21:25 +0200 Subject: [PATCH 10/14] Move annotation capture to Layer --- dask/blockwise.py | 1 + dask/highlevelgraph.py | 8 +++++--- 2 files changed, 6 insertions(+), 3 deletions(-) diff --git a/dask/blockwise.py b/dask/blockwise.py index 10c10083ab3..44c2ca3546c 100644 --- a/dask/blockwise.py +++ b/dask/blockwise.py @@ -210,6 +210,7 @@ def __init__( new_axes=None, io_subgraph=None, ): + super().__init__() self.output = output self.output_indices = tuple(output_indices) self.io_subgraph = io_subgraph[1] if io_subgraph else None diff --git a/dask/highlevelgraph.py b/dask/highlevelgraph.py index 2ab6abe64d0..389d3f34fe4 100644 --- a/dask/highlevelgraph.py +++ b/dask/highlevelgraph.py @@ -55,6 +55,10 @@ class Layer(collections.abc.Mapping): implementations. """ + def __init__(self): + annotations = config.get("annotations", None) + self.annotations = None if annotations is None else annotations.copy() + @abc.abstractmethod def is_materialized(self) -> bool: """Return whether the layer is materialized or not""" @@ -247,14 +251,12 @@ class BasicLayer(Layer): """ def __init__(self, mapping, dependencies=None, global_dependencies=None): + super().__init__() self.mapping = mapping self.dependencies = dependencies self.global_dependencies = global_dependencies self.global_dependencies_has_been_trimmed = False - annotations = config.get("annotations", None) - self.annotations = None if annotations is None else annotations.copy() - def __contains__(self, k): return k in self.mapping From 1cf1a40c52c1b4cac9209c3629ff8664eade3ad7 Mon Sep 17 00:00:00 2001 From: Simon Perkins Date: Wed, 4 Nov 2020 20:22:51 +0200 Subject: [PATCH 11/14] tests --- dask/tests/test_highgraph.py | 16 ++++++++++------ 1 file changed, 10 insertions(+), 6 deletions(-) diff --git a/dask/tests/test_highgraph.py b/dask/tests/test_highgraph.py index 9d8106bfd1a..b39936e037b 100644 --- a/dask/tests/test_highgraph.py +++ b/dask/tests/test_highgraph.py @@ -133,16 +133,20 @@ def test_single_annotation(annotation): assert dask.config.get("annotations", None) is None -def test_no_annotation(): - A = da.ones((10, 10), chunks=(5, 5)) - assert A.__dask_graph__().layers[A.name].annotations is None - - def test_multiple_annotations(): with dask.annotate(block_id=annot_map_fn): with dask.annotate(resource="GPU"): A = da.ones((10, 10), chunks=(5, 5)) + B = A + 1 + + C = B + 1 + + assert dask.config.get("annotations", None) is None + alayer = A.__dask_graph__().layers[A.name] + blayer = B.__dask_graph__().layers[B.name] + clayer = C.__dask_graph__().layers[B.name] assert alayer.annotations == {"resource": "GPU", "block_id": annot_map_fn} - assert dask.config.get("annotations", None) is None + assert blayer.annotations == {"block_id": annot_map_fn} + assert clayer.annotations is None From 63ee221dffc69b064065be882852b7d79def6bb9 Mon Sep 17 00:00:00 2001 From: Simon Perkins Date: Wed, 4 Nov 2020 20:30:55 +0200 Subject: [PATCH 12/14] Use copy.copy to copy annotations --- dask/highlevelgraph.py | 3 +-- dask/tests/test_highgraph.py | 2 +- 2 files changed, 2 insertions(+), 3 deletions(-) diff --git a/dask/highlevelgraph.py b/dask/highlevelgraph.py index 389d3f34fe4..9dc2a6e3053 100644 --- a/dask/highlevelgraph.py +++ b/dask/highlevelgraph.py @@ -56,8 +56,7 @@ class Layer(collections.abc.Mapping): """ def __init__(self): - annotations = config.get("annotations", None) - self.annotations = None if annotations is None else annotations.copy() + self.annotations = copy.copy(config.get("annotations", None)) @abc.abstractmethod def is_materialized(self) -> bool: diff --git a/dask/tests/test_highgraph.py b/dask/tests/test_highgraph.py index b39936e037b..08fa457de4b 100644 --- a/dask/tests/test_highgraph.py +++ b/dask/tests/test_highgraph.py @@ -146,7 +146,7 @@ def test_multiple_annotations(): alayer = A.__dask_graph__().layers[A.name] blayer = B.__dask_graph__().layers[B.name] - clayer = C.__dask_graph__().layers[B.name] + clayer = C.__dask_graph__().layers[C.name] assert alayer.annotations == {"resource": "GPU", "block_id": annot_map_fn} assert blayer.annotations == {"block_id": annot_map_fn} assert clayer.annotations is None From ba5678e48d1abbbbf7cf595ff1de18ef7ea6be36 Mon Sep 17 00:00:00 2001 From: Simon Perkins Date: Wed, 4 Nov 2020 23:11:11 +0200 Subject: [PATCH 13/14] fix docstrings --- dask/base.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/dask/base.py b/dask/base.py index 9a27682ba91..8c05d575c64 100644 --- a/dask/base.py +++ b/dask/base.py @@ -60,14 +60,15 @@ def annotate(**annotations): All tasks within array A should have priority 100 and be retried 3 times on failure. + >>> with dask.annotate(priority=100, retries=3): - >>> A = da.ones((10000, 10000)) + A = da.ones((10000, 10000)) Prioritise tasks within Array A on flattened block ID. >>> nblocks = (10, 10) >>> with dask.annotate(priority=lambda k: k[1]*nblocks[1] + k[2]): - >>> A = da.ones((1000, 1000), chunks=(100, 100)) + A = da.ones((1000, 1000), chunks=(100, 100)) """ prev_annotations = config.get("annotations", {}) From faac7e5654eb7244741fca499c2f22d342bb4761 Mon Sep 17 00:00:00 2001 From: Simon Perkins Date: Wed, 4 Nov 2020 23:27:26 +0200 Subject: [PATCH 14/14] fix docs more --- docs/source/api.rst | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/source/api.rst b/docs/source/api.rst index 9362b8adfee..c4be7735f8a 100644 --- a/docs/source/api.rst +++ b/docs/source/api.rst @@ -46,7 +46,7 @@ real-time or advanced operation. This more advanced API is available in the `Dask distributed documentation `_ -.. autofunciton:: annotate +.. autofunction:: annotate .. autofunction:: compute .. autofunction:: is_dask_collection .. autofunction:: optimize