-
-
Notifications
You must be signed in to change notification settings - Fork 1.8k
Use map_blocks in array.store to avoid materialization and dropping of annotations
#11844
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
map_blocks in array store to avoid materialization and dropping of annotationsmap_blocks in array.store to avoid materialization and dropping of annotations
Unit Test ResultsSee test report for an extended history of previous test failures. This is useful for diagnosing flaky tests. 9 files ±0 9 suites ±0 3h 30m 34s ⏱️ + 1m 17s Results for commit 6d7dfe2. ± Comparison against base commit e0877d0. This pull request removes 1 and adds 2 tests. Note that renamed tests count towards both.♻️ This comment has been updated with latest results. |
a9821f7 to
36154cd
Compare
|
Nice, this seems like a great win! Do you expect to see compute-time gains too? Or just at graph submission time?
In most cases, I believe this should be fine since you can also do Miraculously both Xarray & Icechunk tests appear to pass! I'd appreciate some guidance on what to do inside Icechunk. The main entrypoint is here: https://github.com/earth-mover/icechunk/blob/3374ca4968e0989b78643f57c8dda1fee0e8da2e/icechunk-python/python/icechunk/dask.py#L35 ; as you can see I copied a bunch of stuff from EDIT: I've jsut added nightly upstream tests to icechunk so we should get notified if anything breaks when you merge. |
With this PR there are no compute gains expected. I think that the fusion of those storage tasks are already handled by low level fusion so I don't expect much to change on that end. There is some follow up work (timeline TBD) that would help reduce some CPU load on the scheduler which would/could manifest as compute time gains but those are not top priority right now. The other stuff (open PRs of Patrick) are going to address compute time gains but that work also needs a bit more time in the oven and isn't stable yet.
I'll have a look |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull Request Overview
This PR refactors the store operation of Dask Arrays to use map_blocks for a blockwise operation that avoids unnecessary graph materialization and preserves annotations. Key changes include replacing Delayed‐based operations with Array outputs in tests, updating the store implementation in dask/array/core.py to remove low‑level graph stitching in favor of map_blocks with persist and compute, and refining the load/store chunk logic with adjusted parameters.
Reviewed Changes
Copilot reviewed 2 out of 2 changed files in this pull request and generated no comments.
| File | Description |
|---|---|
| dask/array/tests/test_array_core.py | Updates to test assertions and lock behavior to reflect new API |
| dask/array/core.py | Refactoring of store, map_blocks, and load_chunk functions |
Comments suppressed due to low confidence (4)
dask/array/tests/test_array_core.py:2102
- Replace the commented out assertion with a concrete test that verifies stored arrays are persisted correctly, ensuring that the intended behavior is enforced.
# FIXME: The old assert was bad, It asserted that there is only a single
dask/array/tests/test_array_core.py:2337
- Consider adding additional tests for exception scenarios to verify that locks are always released to avoid potential deadlocks in error cases.
assert lock.acquire_count == lock.release_count == a.npartitions + b.npartitions
dask/array/core.py:4586
- Ensure that 'fuse_slice' is imported or defined in the module to avoid a potential NameError at runtime.
index = fuse_slice(region, index)
dask/array/core.py:4612
- [nitpick] Consider adding a type annotation for the 'region' parameter for improved clarity and consistency with the rest of the function signature.
def load_chunk(out: A, index: slice, lock: Any, region) -> A:
* Use dask.compute() instead of compute() method - dask/dask#11844. * Remove out-dated test_landsea_unpacking_uses_dask. * What's New entry.
This PR rewrites
da.storeto rely onmap_blocksinstead of stitching together various low level graphs.This brings a few quite significant benefits
This comes with one breaking change that I believe is not just worth it but also... a feature?? So far, the return type of
da.storewas depending on the combination of the kwargsreturn_storedandcompute. Specifically, in the case ofreturn_stored=False and compute=Falsethis would return aDelayedobject. With the proposed change this would no longer be the case and return type would be consistently anArray. While I get the appeal that there is only one thing to callcomputeon, this is also the reason why the complexity of this function was so high because it dropped the collection type (and the annotations...). I believe that if we wanted to preserve the "compute on one thing" feature we should just concat the arrays.@dcherian I would love to get your input on this. I haven't (yet) tested this with xarray or icechunk. I could imagine that there are some minor breakages but I believe overall this change will improve UX and performance significantly.
While at it, I also fixed how locks are being initiated.
Closes #9381
Closes #10074
Closes #10238
Closes #4349
Closes #8380