-
-
Notifications
You must be signed in to change notification settings - Fork 1.9k
Description
to_delayed(optimize_graph=False) on a DataFrame or Array creates semi-broken Delayed objects which can't interoperate.
These objects have keys like ('array-abcde', 0). Calling __dask_layers__ also gives you ('array-abcde', 0). However, ('array-abcde', 0) isn't actually a layer in the HighLevelGraph—'array-abcde' is the correct name of the layer.
When you try to use these delayed objects together (like ds[0] + ds[1]), they'll produce a HighLevelGraph with invalid dependencies, where the dependencies refer to layers that don't exist.
import dask.array as da
a = da.ones(3, chunks=1)
b = a + 1 # oddly without this (when there's only 1 blockwise operation), things work
ds = b.to_delayed(optimize_graph=False)
d = ds[0]
# assert d.__dask_layers__() in d.dask.layers # this is False
x = ds[0] + ds[1]
x.dask
---------------------------------------------------------------------------
...
~/dev/dask/dask/highlevelgraph.py in __repr__(self)
1129 representation = f"{type(self).__name__} with {len(self.layers)} layers.\n"
1130 representation += f"<{self.__class__.__module__}.{self.__class__.__name__} object at {hex(id(self))}>\n"
-> 1131 for i, layerkey in enumerate(self._toposort_layers()):
1132 representation += f" {i}. {layerkey}\n"
1133 return representation
~/dev/dask/dask/highlevelgraph.py in _toposort_layers(self)
903 for k, v in self.dependencies.items():
904 for dep in v:
--> 905 reverse_deps[dep].append(k)
906 if not v:
907 ready.append(k)
KeyError: ('add-e56f0f399b1e10776fb67d114bf64ab5', 0)I think because delayed doesn't use optimize_blockwise in its default optimizers, you can still compute these objects fine.
But if you turn the Delayeds back into a DataFrame/Array, then it will break during optimization. (EDIT: only after #8174, since otherwise we're throwing away input HLGs entirely.)
Here's a test showing how this breaks roundtripping from collection to delayed and back again:
diff --git a/dask/dataframe/io/tests/test_io.py b/dask/dataframe/io/tests/test_io.py
index 44029818..8b43d3c7 100644
--- a/dask/dataframe/io/tests/test_io.py
+++ b/dask/dataframe/io/tests/test_io.py
@@ -9,6 +9,7 @@ import dask.array as da
import dask.dataframe as dd
from dask.dataframe._compat import tm
from dask.dataframe.io.io import _meta_from_array
+from dask.dataframe.io.demo import make_timeseries
from dask.dataframe.utils import assert_eq, is_categorical_dtype
from dask.delayed import Delayed, delayed
from dask.utils import tmpfile
@@ -697,6 +698,16 @@ def test_to_delayed_optimize_graph():
assert_eq(dx.compute(), dx2.compute())
+@pytest.mark.parametrize("optimize_graph", [False, True])
+def test_delayed_roundtrip(optimize_graph):
+ df = make_timeseries(start="2000-01-01", end="2000-01-02", freq="60s", partition_freq="6h")
+ df2 = df.assign(z=lambda df: df.x + df.y)
+ ds = df2.to_delayed(optimize_graph=optimize_graph)
+ ds_recombined = delayed(lambda *args: args, nout=len(ds))(*ds)
+ df_from_delayed = dd.from_delayed(list(ds_recombined), meta=df2, divisions=df2.divisions)
+ assert_eq(df2, df_from_delayed)
+
+
def test_from_dask_array_index_dtype():
x = da.ones((10,), chunks=(5,))Anything else we need to know?:
The problem is that Delayed and collections just have a different data model for dask keys and how they correspond to HLG layers. Collections have a base name, multiple keys, and expect __dask_layers__ to match that name. Delayed objects just have a single key, and expect __dask_layers__ to match that key.
So technically you can't just give a Delayed object the HLG of a collection. However, adding some renaming layer to the HLG feels silly to work around this problem.
Instead, maybe Delayed objects should have an optional name (or layer_name) attribute that collections can set in to_delayed?
Alternatively, we could just check if the key is a tuple in Delayed.__dask_layers__, and return the first element if so. I don't like this because I don't think we should be making assumptions about key structure like this. And since users can specify keys, doing delayed(x0, name=("x", 0)) + delayed(x1, name=("x", 1)) would then break.
But this does make my test pass
diff --git a/dask/delayed.py b/dask/delayed.py
index 8d80f19f..14a97969 100644
--- a/dask/delayed.py
+++ b/dask/delayed.py
@@ -507,10 +507,20 @@ class Delayed(DaskMethodsMixin, OperatorMethodMixin):
def __dask_layers__(self):
# Delayed objects created with .to_delayed() have exactly
# one layer which may have a non-canonical name "delayed-<original name>"
- if isinstance(self.dask, HighLevelGraph) and len(self.dask.layers) == 1:
- return tuple(self.dask.layers)
+ if isinstance(self.dask, HighLevelGraph):
+ if len(self.dask.layers) == 1:
+ layer = next(iter(self.dask.layers))
+ else:
+ layer = self.key
+
+ if isinstance(layer, tuple):
+ # Delayed objects created with .to_delayed() may have tuple keys;
+ # the layer name in the HLG is the original collection's name, which
+ # should be the first element in the key tuple.
+ layer = layer[0]
else:
- return (self.key,)
+ layer = self.key
+ return (layer,)
def __dask_tokenize__(self):
return self.keyEnvironment:
- Dask version:
main - Python version: 3.8.8
- Operating System: macOS
- Install method (conda, pip, source): source