Skip to content

Add optional IO-subgraph to Blockwise Layers#6715

Merged
jrbourbeau merged 155 commits intodask:masterfrom
rjzamora:blockwise-experimental
Nov 4, 2020
Merged

Add optional IO-subgraph to Blockwise Layers#6715
jrbourbeau merged 155 commits intodask:masterfrom
rjzamora:blockwise-experimental

Conversation

@rjzamora
Copy link
Member

@rjzamora rjzamora commented Oct 8, 2020

Big Picture

Blockwise HighLevelGraph (HLG) layers currently require a collection dependency. This has prevented IO (data-generation) operations from being represeted as Blockwise objects. This, in turn, has prevented optimize_blockwise from fusing IO tasks with follow-up block-wise transformation. This is fine if/when we can use fuse on the full task-graph dictionary, but not if we hope to send HLGs directly to the scheduler.

This PR makes it possible to perform IO from within a Blockwise layer (by passing in a special IO subgraph), and introduces a new BlockwiseParquet Layer. These changes demonstrate that optimize_blockwise behave correctly for IO-enabled layers, without complicating optimize_read_parquet_getitem.

TODO:

  • Add low-level fusion of blockwise io and transition tasks
  • Fuse IO into subgraph_callable (may make future optimizations cleaner)
  • Try to avoid IO-subgraph dict materialization when culling and/or getting dependencies
  • Try to avoid "no-op" tasks in IO-only Blockwise layers
  • Update other IO operations to leverage Blockwise (e.g. read_csv)
  • Remove from_callable API, and remove or update the related test

@rjzamora rjzamora marked this pull request as ready for review October 17, 2020 03:35
@rjzamora
Copy link
Member Author

Update: While 8d371ce demonstrates low-level task fusion of IO and (traditional) blockwise operations, the most recent commit goes a bit further and pushes IO-function calls into the same subgraph_callable as the other blockwise operations.

I assume it is best (for scheduler logic) to fuse all operations into the same subgraph_callable definition, but this may be a bit more restrictive on the properties of the IO subgraph -- We need subgraph.get((<io_name>, <partition_index>)) to return a tuple with the first element being an "IO function".

@rjzamora
Copy link
Member Author

rjzamora commented Oct 25, 2020

It seems that (at least some of) the CI failures are related to fsspec#458

EDIT: It seems that the issue was in aiohttp 3.7.0 and has been resolved in 3.7.1

@rjzamora rjzamora changed the title [WIP] Add optional IO-subgraph to Blockwise Layers Add optional IO-subgraph to Blockwise Layers Oct 27, 2020
Copy link
Member

@jrbourbeau jrbourbeau left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for all your work here @rjzamora! The test you've added does a really nice job of demonstrating blockwise layer optimizations

# Extract actual IO function for SubgraphCallable construction.
# Wrap func in `PackedFunctionCall`, since it will receive
# all arguments as a sigle (packed) tuple at run time.
io_func = self.io_subgraph.get((self.io_name, 0), (None,))[0]
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should document whatever assumptions we're making about the structure of io_subgraph

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I made some minor tweaks here and added notes in the docstring and below to clarify the assumptions. Let me know what you think.

graph = optimize_blockwise(ddf.__dask_graph__(), keys)
layers = graph.layers
name = list(layers.keys())[0]
assert len(layers) == 1
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice! This is good to see : )

Comment on lines +296 to +297
# TODO: Handle N-D Collections and more-complex
# tensor operations.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you open up an issue with checkboxes to track follow-up work we'll want to do to ensure we're using using IO-subgraphs throughout the codebase

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added #6791 (to track the use of IO subgraphs) and #6792 (to track improvements in this particular method) - Let me know if you had anything else in mind. Also, feel free to modify those issues in any way you'd like :)

Copy link
Member

@jrbourbeau jrbourbeau left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @rjzamora! This is in

@jrbourbeau jrbourbeau merged commit 5589bfd into dask:master Nov 4, 2020
@rjzamora rjzamora deleted the blockwise-experimental branch May 21, 2024 00:04
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants