Skip to content

to_delayed(optimize_graph=False) objects often have incorrect __dask_layers__ #8173

@gjoseph92

Description

@gjoseph92

to_delayed(optimize_graph=False) on a DataFrame or Array creates semi-broken Delayed objects which can't interoperate.

These objects have keys like ('array-abcde', 0). Calling __dask_layers__ also gives you ('array-abcde', 0). However, ('array-abcde', 0) isn't actually a layer in the HighLevelGraph—'array-abcde' is the correct name of the layer.

When you try to use these delayed objects together (like ds[0] + ds[1]), they'll produce a HighLevelGraph with invalid dependencies, where the dependencies refer to layers that don't exist.

import dask.array as da
a = da.ones(3, chunks=1)
b = a + 1  # oddly without this (when there's only 1 blockwise operation), things work
ds = b.to_delayed(optimize_graph=False)
d = ds[0]
# assert d.__dask_layers__() in d.dask.layers  # this is False
x = ds[0] + ds[1]
x.dask
---------------------------------------------------------------------------
...
~/dev/dask/dask/highlevelgraph.py in __repr__(self)
   1129         representation = f"{type(self).__name__} with {len(self.layers)} layers.\n"
   1130         representation += f"<{self.__class__.__module__}.{self.__class__.__name__} object at {hex(id(self))}>\n"
-> 1131         for i, layerkey in enumerate(self._toposort_layers()):
   1132             representation += f" {i}. {layerkey}\n"
   1133         return representation

~/dev/dask/dask/highlevelgraph.py in _toposort_layers(self)
    903         for k, v in self.dependencies.items():
    904             for dep in v:
--> 905                 reverse_deps[dep].append(k)
    906             if not v:
    907                 ready.append(k)

KeyError: ('add-e56f0f399b1e10776fb67d114bf64ab5', 0)

I think because delayed doesn't use optimize_blockwise in its default optimizers, you can still compute these objects fine.

But if you turn the Delayeds back into a DataFrame/Array, then it will break during optimization. (EDIT: only after #8174, since otherwise we're throwing away input HLGs entirely.)

Here's a test showing how this breaks roundtripping from collection to delayed and back again:

diff --git a/dask/dataframe/io/tests/test_io.py b/dask/dataframe/io/tests/test_io.py
index 44029818..8b43d3c7 100644
--- a/dask/dataframe/io/tests/test_io.py
+++ b/dask/dataframe/io/tests/test_io.py
@@ -9,6 +9,7 @@ import dask.array as da
 import dask.dataframe as dd
 from dask.dataframe._compat import tm
 from dask.dataframe.io.io import _meta_from_array
+from dask.dataframe.io.demo import make_timeseries
 from dask.dataframe.utils import assert_eq, is_categorical_dtype
 from dask.delayed import Delayed, delayed
 from dask.utils import tmpfile
@@ -697,6 +698,16 @@ def test_to_delayed_optimize_graph():
     assert_eq(dx.compute(), dx2.compute())
 
 
+@pytest.mark.parametrize("optimize_graph", [False, True])
+def test_delayed_roundtrip(optimize_graph):
+    df = make_timeseries(start="2000-01-01", end="2000-01-02", freq="60s", partition_freq="6h")
+    df2 = df.assign(z=lambda df: df.x + df.y)
+    ds = df2.to_delayed(optimize_graph=optimize_graph)
+    ds_recombined = delayed(lambda *args: args, nout=len(ds))(*ds)
+    df_from_delayed = dd.from_delayed(list(ds_recombined), meta=df2, divisions=df2.divisions)
+    assert_eq(df2, df_from_delayed)
+
+
 def test_from_dask_array_index_dtype():
     x = da.ones((10,), chunks=(5,))

Anything else we need to know?:

The problem is that Delayed and collections just have a different data model for dask keys and how they correspond to HLG layers. Collections have a base name, multiple keys, and expect __dask_layers__ to match that name. Delayed objects just have a single key, and expect __dask_layers__ to match that key.

So technically you can't just give a Delayed object the HLG of a collection. However, adding some renaming layer to the HLG feels silly to work around this problem.

Instead, maybe Delayed objects should have an optional name (or layer_name) attribute that collections can set in to_delayed?

Alternatively, we could just check if the key is a tuple in Delayed.__dask_layers__, and return the first element if so. I don't like this because I don't think we should be making assumptions about key structure like this. And since users can specify keys, doing delayed(x0, name=("x", 0)) + delayed(x1, name=("x", 1)) would then break.

But this does make my test pass
diff --git a/dask/delayed.py b/dask/delayed.py
index 8d80f19f..14a97969 100644
--- a/dask/delayed.py
+++ b/dask/delayed.py
@@ -507,10 +507,20 @@ class Delayed(DaskMethodsMixin, OperatorMethodMixin):
     def __dask_layers__(self):
         # Delayed objects created with .to_delayed() have exactly
         # one layer which may have a non-canonical name "delayed-<original name>"
-        if isinstance(self.dask, HighLevelGraph) and len(self.dask.layers) == 1:
-            return tuple(self.dask.layers)
+        if isinstance(self.dask, HighLevelGraph):
+            if len(self.dask.layers) == 1:
+                layer = next(iter(self.dask.layers))
+            else:
+                layer = self.key
+
+            if isinstance(layer, tuple):
+                # Delayed objects created with .to_delayed() may have tuple keys;
+                # the layer name in the HLG is the original collection's name, which
+                # should be the first element in the key tuple.
+                layer = layer[0]
         else:
-            return (self.key,)
+            layer = self.key
+        return (layer,)
 
     def __dask_tokenize__(self):
         return self.key

Environment:

  • Dask version: main
  • Python version: 3.8.8
  • Operating System: macOS
  • Install method (conda, pip, source): source

cc @rjzamora @jcrist

Metadata

Metadata

Assignees

No one assigned

    Labels

    delayedneeds attentionIt's been a while since this was pushed on. Needs attention from the owner or a maintainer.

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions