Skip to content

Add split_every to graph_manipulation#7282

Merged
jrbourbeau merged 9 commits intodask:masterfrom
crusaderky:split_every
Mar 3, 2021
Merged

Add split_every to graph_manipulation#7282
jrbourbeau merged 9 commits intodask:masterfrom
crusaderky:split_every

Conversation

@crusaderky
Copy link
Collaborator

@crusaderky crusaderky commented Feb 26, 2021

Although the outputs of the map stage of dask.graph_manipulation.checkpoint are just a bunch of None's, it's been observed that the distributed scheduler incorrectly concentrates the pre-map data (potentially gigabytes) onto the worker that computes the final node of checkpoint. This will eventually need to be fixed in distributed but won't be an easy fix.

This PR sidesteps the problem, removing the nexus node and replacing it with a recursive aggregation equivalent to the one already implemented in dask.array, dask.bag, and dask.dataframe.

Follow-up: #7283

@crusaderky crusaderky changed the title WIP add split_every to graph_manipulation Add split_every to graph_manipulation Mar 2, 2021
@crusaderky crusaderky marked this pull request as ready for review March 2, 2021 21:42

SplitEvery = Union[Number, Literal[False], None]
except ImportError:
SplitEvery = Union[Number, bool, None] # type: ignore
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The alternative to this ugly thing was to add a dependency to typing_extensions. Which, if we are going to be serious with type annotations, we'll eventually have to do.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Another alternative was to wrap the definition in if TYPE_CHECKING. As long as you have a recent version of mypy, you don't need to maintain backwards compatibility with older versions of Python.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is fine for now. Since Dask development doesn't use type annotations I'm slightly against adding infrastructure to support them.

That said, if you're passionate about type annotations then I'd encourage you to engage on dask/distributed#2803. I recall previous discussions where type annotations were brought up with mixed reviews, but that was a while ago so it could be that people's opinions on them have changes since then.

try:
layers_to_clone = set(child.__dask_layers__())
except AttributeError:
layers_to_clone = prev_coll_names.copy()
Copy link
Collaborator Author

@crusaderky crusaderky Mar 2, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These two hunks fix a bug where a collection defines __dask_layers__ which returns 2+ layers, but __dask_graph__ returns a plain dict. Unit tests have been updated to trigger the issue.

if is_bound:
new_deps[new_layer_name].add(blocker_key)
new_dep.add(blocker_key)
new_deps[new_layer_name] = new_dep
Copy link
Collaborator Author

@crusaderky crusaderky Mar 2, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this change is to make mypy happy, as new_deps is (correctly) a dict of read-only AbstractSets.

@crusaderky
Copy link
Collaborator Author

@jrbourbeau ready for review

@crusaderky crusaderky closed this Mar 2, 2021
@crusaderky crusaderky reopened this Mar 2, 2021
Copy link
Member

@jrbourbeau jrbourbeau left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for your work on this @crusaderky!

Also, I appreciate you raising #7283 to improve split_every consistency across the project

@jrbourbeau jrbourbeau merged commit 293e6c1 into dask:master Mar 3, 2021
@crusaderky crusaderky deleted the split_every branch March 3, 2021 11:19
dcherian added a commit to dcherian/dask that referenced this pull request Mar 8, 2021
* upstream/master: (43 commits)
  bump version to 2021.03.0
  Bump minimum version of distributed (dask#7328)
  Fix `percentiles_summary` with `dask_cudf` (dask#7325)
  Temporarily revert recent Array.__setitem__ updates (dask#7326)
  Blockwise.clone (dask#7312)
  NEP-35 duck array update (dask#7321)
  Don't allow setting `.name` for array (dask#7222)
  Use nearest interpolation for creating percentiles of integer input (dask#7305)
  Test `exp` with CuPy arrays (dask#7322)
  Check that computed chunks have right size and dtype (dask#7277)
  pytest.mark.flaky (dask#7319)
  Contributing docs: add note to pull the latest git tags before pip installing Dask (dask#7308)
  Support for Python 3.9 (dask#7289)
  Add broadcast-based merge implementation (dask#7143)
  Add split_every to graph_manipulation (dask#7282)
  Typo in optimize docs (dask#7306)
  dask.graph_manipulation support for xarray.Dataset (dask#7276)
  Add plot width and height support for Bokeh 2.3.0 (dask#7297)
  Add numpy functions tri, triu_indices, triu_indices_from, tril_indices, tril_indices_from (dask#6997)
  Remove "cleanup" task in dataframe on-disk shuffle. The partd directory (dask#7260)
  ...
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants