Skip to content

Conversation

@fjetter
Copy link
Member

@fjetter fjetter commented Mar 24, 2025

This PR rewrites da.store to rely on map_blocks instead of stitching together various low level graphs.

This brings a few quite significant benefits

  • It is a blockwise operation and can therefore be properly fused to the incoming layer which mean less tasks and much faster optimization
  • More importantly, this eliminates the requirement to optimize the input collections ahead of time and therefore force a graph materialization. Particularly, with Wrap HLGs in an Expr to avoid Client side materialization #11736 this allows us to finally avoid client side materialization for arrays 🎉

This comes with one breaking change that I believe is not just worth it but also... a feature?? So far, the return type of da.store was depending on the combination of the kwargs return_stored and compute. Specifically, in the case of return_stored=False and compute=False this would return a Delayed object. With the proposed change this would no longer be the case and return type would be consistently an Array. While I get the appeal that there is only one thing to call compute on, this is also the reason why the complexity of this function was so high because it dropped the collection type (and the annotations...). I believe that if we wanted to preserve the "compute on one thing" feature we should just concat the arrays.

@dcherian I would love to get your input on this. I haven't (yet) tested this with xarray or icechunk. I could imagine that there are some minor breakages but I believe overall this change will improve UX and performance significantly.

While at it, I also fixed how locks are being initiated.

Closes #9381
Closes #10074
Closes #10238
Closes #4349
Closes #8380

@fjetter fjetter changed the title Use map_blocks in array store to avoid materialization and dropping of annotations Use map_blocks in array.store to avoid materialization and dropping of annotations Mar 24, 2025
@github-actions
Copy link
Contributor

github-actions bot commented Mar 24, 2025

Unit Test Results

See test report for an extended history of previous test failures. This is useful for diagnosing flaky tests.

      9 files  ±0        9 suites  ±0   3h 30m 34s ⏱️ + 1m 17s
 17 820 tests +1   16 606 ✅ + 2   1 214 💤  -  1  0 ❌ ±0 
159 454 runs  +9  147 350 ✅ +19  12 104 💤  - 10  0 ❌ ±0 

Results for commit 6d7dfe2. ± Comparison against base commit e0877d0.

This pull request removes 1 and adds 2 tests. Note that renamed tests count towards both.
dask.tests.test_base ‑ test_optimizations_ctd
dask.array.tests.test_array_core ‑ test_store_locks_failure_lock_released
dask.tests.test_hlgexpr ‑ test_tokenize

♻️ This comment has been updated with latest results.

@fjetter fjetter force-pushed the array_store_map_blocks branch from a9821f7 to 36154cd Compare March 24, 2025 13:48
@dcherian
Copy link
Collaborator

dcherian commented Mar 25, 2025

Nice, this seems like a great win!

Do you expect to see compute-time gains too? Or just at graph submission time?

With the proposed change this would no longer be the case and return type would be consistently an Array.

In most cases, I believe this should be fine since you can also do dask.compute(Array). So the only people affected are those explicitly looking for isinstance(value, dask.delayed.Delayed). I'm not sure who does that (perhaps rioxarray/odc-geo)?


Miraculously both Xarray & Icechunk tests appear to pass!

I'd appreciate some guidance on what to do inside Icechunk. The main entrypoint is here: https://github.com/earth-mover/icechunk/blob/3374ca4968e0989b78643f57c8dda1fee0e8da2e/icechunk-python/python/icechunk/dask.py#L35 ; as you can see I copied a bunch of stuff from dask.array.store


EDIT: I've jsut added nightly upstream tests to icechunk so we should get notified if anything breaks when you merge.

@fjetter
Copy link
Member Author

fjetter commented Mar 26, 2025

Do you expect to see compute-time gains too? Or just at graph submission time?

With this PR there are no compute gains expected. I think that the fusion of those storage tasks are already handled by low level fusion so I don't expect much to change on that end.

There is some follow up work (timeline TBD) that would help reduce some CPU load on the scheduler which would/could manifest as compute time gains but those are not top priority right now.

The other stuff (open PRs of Patrick) are going to address compute time gains but that work also needs a bit more time in the oven and isn't stable yet.

I'd appreciate some guidance on what to do inside Icechunk. The main entrypoint is here:

I'll have a look

Copy link

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull Request Overview

This PR refactors the store operation of Dask Arrays to use map_blocks for a blockwise operation that avoids unnecessary graph materialization and preserves annotations. Key changes include replacing Delayed‐based operations with Array outputs in tests, updating the store implementation in dask/array/core.py to remove low‑level graph stitching in favor of map_blocks with persist and compute, and refining the load/store chunk logic with adjusted parameters.

Reviewed Changes

Copilot reviewed 2 out of 2 changed files in this pull request and generated no comments.

File Description
dask/array/tests/test_array_core.py Updates to test assertions and lock behavior to reflect new API
dask/array/core.py Refactoring of store, map_blocks, and load_chunk functions
Comments suppressed due to low confidence (4)

dask/array/tests/test_array_core.py:2102

  • Replace the commented out assertion with a concrete test that verifies stored arrays are persisted correctly, ensuring that the intended behavior is enforced.
# FIXME: The old assert was bad, It asserted that there is only a single

dask/array/tests/test_array_core.py:2337

  • Consider adding additional tests for exception scenarios to verify that locks are always released to avoid potential deadlocks in error cases.
assert lock.acquire_count == lock.release_count == a.npartitions + b.npartitions

dask/array/core.py:4586

  • Ensure that 'fuse_slice' is imported or defined in the module to avoid a potential NameError at runtime.
index = fuse_slice(region, index)

dask/array/core.py:4612

  • [nitpick] Consider adding a type annotation for the 'region' parameter for improved clarity and consistency with the rest of the function signature.
def load_chunk(out: A, index: slice, lock: Any, region) -> A:

@fjetter fjetter merged commit cebcc04 into dask:main Apr 1, 2025
23 of 24 checks passed
trexfeathers added a commit to trexfeathers/iris that referenced this pull request May 1, 2025
trexfeathers added a commit to trexfeathers/iris that referenced this pull request May 9, 2025
bjlittle pushed a commit to SciTools/iris that referenced this pull request May 9, 2025
* Use dask.compute() instead of compute() method - dask/dask#11844.

* Remove out-dated test_landsea_unpacking_uses_dask.

* What's New entry.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

2 participants