Skip to content

Commit 44cd9bc

Browse files
author
Ian Rose
authored
Fuse compatible annotations (#9402)
1 parent 097decd commit 44cd9bc

4 files changed

Lines changed: 128 additions & 7 deletions

File tree

dask/array/tests/test_atop.py

Lines changed: 49 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@
1616
rewrite_blockwise,
1717
)
1818
from dask.highlevelgraph import HighLevelGraph
19-
from dask.utils_test import dec, inc
19+
from dask.utils_test import dec, hlg_layer_topological, inc
2020

2121
a, b, c, d, e, f, g = "a", "b", "c", "d", "e", "f", "g"
2222
_0, _1, _2, _3, _4, _5, _6, _7, _8, _9 = (
@@ -324,7 +324,54 @@ def test_optimize_blockwise():
324324
)
325325

326326

327-
def test_optimize_blockwise_annotations():
327+
def test_optimize_blockwise_control_annotations():
328+
"""
329+
Can we fuse blockwise layers with different, but compatible
330+
annotations for retries, priority, etc.
331+
"""
332+
333+
a = da.ones(10, chunks=(5,))
334+
b = a + 1
335+
336+
with dask.annotate(retries=5, workers=["a", "b", "c"], allow_other_workers=False):
337+
c = b + 2
338+
339+
with dask.annotate(priority=2, workers=["b", "c", "d"], allow_other_workers=True):
340+
d = c + 3
341+
342+
with dask.annotate(retries=3, resources={"GPU": 2, "Memory": 10}):
343+
e = d + 4
344+
345+
with dask.annotate(priority=4, resources={"GPU": 5, "Memory": 4}):
346+
f = e + 5
347+
348+
# This one will not be fused due to the custom annotation, nor will the one below
349+
with dask.annotate(foo="bar"):
350+
g = f + 6
351+
352+
h = g + 6
353+
354+
dsk = da.optimization.optimize_blockwise(h.dask)
355+
356+
# The layers and their annotations should be fusable until the custom one
357+
assert len(dsk.layers) == 3
358+
layer = hlg_layer_topological(dsk, 0) # First layer is the fused one
359+
annotations = layer.annotations
360+
361+
assert len(annotations) == 5
362+
assert annotations["priority"] == 4 # max
363+
assert annotations["retries"] == 5 # max
364+
assert annotations["allow_other_workers"] is False # More restrictive
365+
assert set(annotations["workers"]) == {"b", "c"} # intersection
366+
assert annotations["resources"] == {"GPU": 5, "Memory": 10} # Max of resources
367+
368+
# If we disable blockwise annotation fusion, we can only fuse the first two layers.
369+
with dask.config.set({"optimization.annotations.fuse": False}):
370+
dsk = da.optimization.optimize_blockwise(h.dask)
371+
assert len(dsk.layers) == 7
372+
373+
374+
def test_optimize_blockwise_custom_annotations():
328375
a = da.ones(10, chunks=(5,))
329376
b = a + 1
330377

dask/blockwise.py

Lines changed: 62 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99

1010
import tlz as toolz
1111

12+
import dask
1213
from dask.base import clone_key, get_name_from_key, tokenize
1314
from dask.core import flatten, keys_in_tasks, reverse_dict
1415
from dask.delayed import unpack_collections
@@ -1351,10 +1352,9 @@ def _optimize_blockwise(full_graph, keys=()):
13511352
):
13521353
stack.append(dep)
13531354
continue
1354-
if (
1355-
blockwise_layers
1356-
and layers[next(iter(blockwise_layers))].annotations
1357-
!= layers[dep].annotations
1355+
if blockwise_layers and not _can_fuse_annotations(
1356+
layers[next(iter(blockwise_layers))].annotations,
1357+
layers[dep].annotations,
13581358
):
13591359
stack.append(dep)
13601360
continue
@@ -1412,6 +1412,60 @@ def _unique_dep(dep, ind):
14121412
return dep + "_" + "_".join(str(i) for i in list(ind))
14131413

14141414

1415+
def _can_fuse_annotations(a: dict | None, b: dict | None) -> bool:
1416+
"""
1417+
Treat the special annotation keys, as fusable since we can apply simple
1418+
rules to capture their intent in a fused layer.
1419+
"""
1420+
if a == b:
1421+
return True
1422+
1423+
if dask.config.get("optimization.annotations.fuse") is False:
1424+
return False
1425+
1426+
fusable = {"retries", "priority", "resources", "workers", "allow_other_workers"}
1427+
if (not a or all(k in fusable for k in a)) and (
1428+
not b or all(k in fusable for k in b)
1429+
):
1430+
return True
1431+
1432+
return False
1433+
1434+
1435+
def _fuse_annotations(*args: dict) -> dict:
1436+
"""
1437+
Given an iterable of annotations dictionaries, fuse them according
1438+
to some simple rules.
1439+
"""
1440+
# First, do a basic dict merge -- we are presuming that these have already
1441+
# been gated by `_can_fuse_annotations`.
1442+
annotations = toolz.merge(*args)
1443+
# Max of layer retries
1444+
retries = [a["retries"] for a in args if "retries" in a]
1445+
if retries:
1446+
annotations["retries"] = max(retries)
1447+
# Max of layer priorities
1448+
priorities = [a["priority"] for a in args if "priority" in a]
1449+
if priorities:
1450+
annotations["priority"] = max(priorities)
1451+
# Max of all the layer resources
1452+
resources = [a["resources"] for a in args if "resources" in a]
1453+
if resources:
1454+
annotations["resources"] = toolz.merge_with(max, *resources)
1455+
# Intersection of all the worker restrictions
1456+
workers = [a["workers"] for a in args if "workers" in a]
1457+
if workers:
1458+
annotations["workers"] = list(set.intersection(*[set(w) for w in workers]))
1459+
# More restrictive of allow_other_workers
1460+
allow_other_workers = [
1461+
a["allow_other_workers"] for a in args if "allow_other_workers" in a
1462+
]
1463+
if allow_other_workers:
1464+
annotations["allow_other_workers"] = all(allow_other_workers)
1465+
1466+
return annotations
1467+
1468+
14151469
def rewrite_blockwise(inputs):
14161470
"""Rewrite a stack of Blockwise expressions into a single blockwise expression
14171471
@@ -1435,6 +1489,9 @@ def rewrite_blockwise(inputs):
14351489
# Fast path: if there's only one input we can just use it as-is.
14361490
return inputs[0]
14371491

1492+
fused_annotations = _fuse_annotations(
1493+
*[i.annotations for i in inputs if i.annotations]
1494+
)
14381495
inputs = {inp.output: inp for inp in inputs}
14391496
dependencies = {
14401497
inp.output: {d for d, v in inp.indices if v is not None and d in inputs}
@@ -1560,7 +1617,7 @@ def rewrite_blockwise(inputs):
15601617
numblocks=numblocks,
15611618
new_axes=new_axes,
15621619
concatenate=concatenate,
1563-
annotations=inputs[root].annotations,
1620+
annotations=fused_annotations,
15641621
io_deps=io_deps,
15651622
)
15661623

dask/dask-schema.yaml

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -92,6 +92,21 @@ properties:
9292
type: object
9393
properties:
9494

95+
annotations:
96+
type: object
97+
properties:
98+
fuse:
99+
type: boolean
100+
description: |
101+
If adjacent blockwise layers have different annotations (e.g., one has
102+
retries=3 and another has retries=4), Dask can make an attempt to merge
103+
those annotations according to some simple rules. ``retries`` is set to
104+
the max of the layers, ``priority`` is set to the max of the layers,
105+
``resources`` are set to the max of all the resources, ``workers`` is
106+
set to the intersection of the requested workers. If this setting is
107+
disabled, then adjacent blockwise layers with different annotations
108+
will *not* be fused.
109+
95110
fuse:
96111
type: object
97112
description: Options for Dask's task fusion optimizations

dask/dask.yaml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,8 @@ array:
1919
split-large-chunks: null # How to handle large output chunks in slicing. Warns by default.
2020

2121
optimization:
22+
annotations:
23+
fuse: true # Automatically fuse compatible annotations on layers
2224
fuse:
2325
active: null # Treat as false for dask.dataframe, true for everything else
2426
ave-width: 1

0 commit comments

Comments
 (0)