-
-
Notifications
You must be signed in to change notification settings - Fork 1.8k
Description
What happened:
I'm trying to take the result from a map_blocks function and store one slice of the resulting array in one zarr array and another slice in another array. I set compute=False on the da.store calls so that I can compute them together later and avoid computing the array multiple times. However, it seems the map blocks function is called for each store operation. From what I can tell the dependency graph is getting lost.
What you expected to happen:
Two da.store calls with shared tasks on their dask graphs should share computations when computed at the same time.
Minimal Complete Verifiable Example:
import dask
import dask.array as da
import numpy as np
TOTAL_CALLS = 0
def shared_task2(arr1):
global TOTAL_CALLS
TOTAL_CALLS += 1
return np.stack([arr1 + 1, arr1 + 2])
if __name__ == "__main__":
start = da.zeros((2, 2), chunks=1)
src = da.map_blocks(shared_task2, start, dtype=start.dtype,
meta=np.array((), dtype=start.dtype),
new_axis=[0],
chunks=(2,) + start.chunks)
target1 = np.zeros((2, 2))
target2 = np.zeros((2, 2))
with dask.config.set(schedulers='single-threaded'):
store_res1 = da.store(src[0], target1, compute=False)
store_res2 = da.store(src[1], target2, compute=False)
da.compute(store_res1, store_res2)
# one call per chunk
assert TOTAL_CALLS == start.blocks.sizeAnything else we need to know?:
As mentioned above, I'm actually trying to use to_zarr to save some dask arrays. I want to write them to zarr and then get the resulting loaded zarr arrays as results, but I couldn't find a way to do that with the combination of return_stored and compute=False as you need to compute to write to the zarr arrays, but that then returns the resulting numpy arrays.
Also note that my map_blocks function is returning the np.stack because it is actually returning two results.
Environment:
- Dask version: 2021.11.1
- Python version: 3.9
- Operating System: Ubuntu/PopOS
- Install method (conda, pip, source): conda-forge