Add split_every to graph_manipulation#7282
Conversation
test test test invalid
|
|
||
| SplitEvery = Union[Number, Literal[False], None] | ||
| except ImportError: | ||
| SplitEvery = Union[Number, bool, None] # type: ignore |
There was a problem hiding this comment.
The alternative to this ugly thing was to add a dependency to typing_extensions. Which, if we are going to be serious with type annotations, we'll eventually have to do.
There was a problem hiding this comment.
Another alternative was to wrap the definition in if TYPE_CHECKING. As long as you have a recent version of mypy, you don't need to maintain backwards compatibility with older versions of Python.
There was a problem hiding this comment.
I think this is fine for now. Since Dask development doesn't use type annotations I'm slightly against adding infrastructure to support them.
That said, if you're passionate about type annotations then I'd encourage you to engage on dask/distributed#2803. I recall previous discussions where type annotations were brought up with mixed reviews, but that was a while ago so it could be that people's opinions on them have changes since then.
| try: | ||
| layers_to_clone = set(child.__dask_layers__()) | ||
| except AttributeError: | ||
| layers_to_clone = prev_coll_names.copy() |
There was a problem hiding this comment.
These two hunks fix a bug where a collection defines __dask_layers__ which returns 2+ layers, but __dask_graph__ returns a plain dict. Unit tests have been updated to trigger the issue.
| if is_bound: | ||
| new_deps[new_layer_name].add(blocker_key) | ||
| new_dep.add(blocker_key) | ||
| new_deps[new_layer_name] = new_dep |
There was a problem hiding this comment.
this change is to make mypy happy, as new_deps is (correctly) a dict of read-only AbstractSets.
|
@jrbourbeau ready for review |
jrbourbeau
left a comment
There was a problem hiding this comment.
Thanks for your work on this @crusaderky!
Also, I appreciate you raising #7283 to improve split_every consistency across the project
* upstream/master: (43 commits) bump version to 2021.03.0 Bump minimum version of distributed (dask#7328) Fix `percentiles_summary` with `dask_cudf` (dask#7325) Temporarily revert recent Array.__setitem__ updates (dask#7326) Blockwise.clone (dask#7312) NEP-35 duck array update (dask#7321) Don't allow setting `.name` for array (dask#7222) Use nearest interpolation for creating percentiles of integer input (dask#7305) Test `exp` with CuPy arrays (dask#7322) Check that computed chunks have right size and dtype (dask#7277) pytest.mark.flaky (dask#7319) Contributing docs: add note to pull the latest git tags before pip installing Dask (dask#7308) Support for Python 3.9 (dask#7289) Add broadcast-based merge implementation (dask#7143) Add split_every to graph_manipulation (dask#7282) Typo in optimize docs (dask#7306) dask.graph_manipulation support for xarray.Dataset (dask#7276) Add plot width and height support for Bokeh 2.3.0 (dask#7297) Add numpy functions tri, triu_indices, triu_indices_from, tril_indices, tril_indices_from (dask#6997) Remove "cleanup" task in dataframe on-disk shuffle. The partd directory (dask#7260) ...
Although the outputs of the map stage of
dask.graph_manipulation.checkpointare just a bunch of None's, it's been observed that the distributed scheduler incorrectly concentrates the pre-map data (potentially gigabytes) onto the worker that computes the final node ofcheckpoint. This will eventually need to be fixed in distributed but won't be an easy fix.This PR sidesteps the problem, removing the nexus node and replacing it with a recursive aggregation equivalent to the one already implemented in dask.array, dask.bag, and dask.dataframe.
Follow-up: #7283