-
-
Notifications
You must be signed in to change notification settings - Fork 1.9k
Open
Labels
Description
What happened:
Error raised during graph.validate() for a graph which was valid in 2021.9.1.
This graph creates a dataframe per chunk of an array, then merges the dataframes along one axis of the array. A final per-chunk analysis is then run, where a merged dataframe is used in the analysis of each chunk that was used to make it.
What you expected to happen:
The constructed graph should validate.
Minimal Complete Verifiable Example:
import functools
import operator
import dask
import dask.array as da
import dask.dataframe as dd
import numpy as np
META = dd.utils.make_meta([("foo", int)])
def test_sequential():
image = da.zeros((2, 1), chunks=(1, -1))
indices = list(np.ndindex(*image.numblocks))
slices = da.core.slices_from_chunks(image.chunks)
chunks = list(map(functools.partial(operator.getitem, image), slices))
# Create one dataframe per chunk.
ddf_all = np.empty(image.numblocks, dtype=object)
for idx_chunk, chunk in zip(indices, chunks):
ddf_delayed = dask.delayed(lambda: None)(chunk)
ddf_all[idx_chunk] = dd.from_delayed(ddf_delayed, meta=META)
# Merge dataframes along first axis.
ddf_array = np.empty(image.numblocks[2:], dtype=object)
for idx_spatial in np.ndindex(*ddf_array.shape):
idx_all = (...,) + idx_spatial
ddf = dd.concat(ddf_all[idx_all].flatten().tolist())
ddf_array[idx_spatial] = ddf
# Run a per-chunk calculation using the merged dataframes,
# where a single merged dataframe is passed back to the all chunks that
# went into making it.
signal = []
for idx_chunk, chunk in zip(indices, chunks):
ddf = ddf_array[idx_chunk[2:]]
ddf_signal_chunk = dask.delayed(lambda: None)(chunk, ddf)
signal.append(ddf_signal_chunk)
ddf_signal = dd.from_delayed(signal, meta=META)
ddf_signal.__dask_graph__().validate() # FAILSOutput:
E ValueError: incorrect dependencies['finalize-287aac88-1505-4627-8949-4d20bf676833']: {'getitem-5a7a378a35301a5d6727950e7814a5c2'} expected {'concat-732322a8bf255193f89c61560c08bcdb'}
Environment:
- Dask version: 2021.10.0
- Python version: 3.8.10
- Operating System: Ubuntu 20.1
- Install method (conda, pip, source): pip
Reactions are currently unavailable