Add optional IO-subgraph to Blockwise Layers#6715
Conversation
|
Update: While 8d371ce demonstrates low-level task fusion of IO and (traditional) blockwise operations, the most recent commit goes a bit further and pushes IO-function calls into the same I assume it is best (for scheduler logic) to fuse all operations into the same |
|
It seems that (at least some of) the CI failures are related to fsspec#458 EDIT: It seems that the issue was in aiohttp 3.7.0 and has been resolved in 3.7.1 |
jrbourbeau
left a comment
There was a problem hiding this comment.
Thanks for all your work here @rjzamora! The test you've added does a really nice job of demonstrating blockwise layer optimizations
dask/blockwise.py
Outdated
| # Extract actual IO function for SubgraphCallable construction. | ||
| # Wrap func in `PackedFunctionCall`, since it will receive | ||
| # all arguments as a sigle (packed) tuple at run time. | ||
| io_func = self.io_subgraph.get((self.io_name, 0), (None,))[0] |
There was a problem hiding this comment.
We should document whatever assumptions we're making about the structure of io_subgraph
There was a problem hiding this comment.
I made some minor tweaks here and added notes in the docstring and below to clarify the assumptions. Let me know what you think.
| graph = optimize_blockwise(ddf.__dask_graph__(), keys) | ||
| layers = graph.layers | ||
| name = list(layers.keys())[0] | ||
| assert len(layers) == 1 |
There was a problem hiding this comment.
Nice! This is good to see : )
| # TODO: Handle N-D Collections and more-complex | ||
| # tensor operations. |
There was a problem hiding this comment.
Could you open up an issue with checkboxes to track follow-up work we'll want to do to ensure we're using using IO-subgraphs throughout the codebase
jrbourbeau
left a comment
There was a problem hiding this comment.
Thanks @rjzamora! This is in
Big Picture
BlockwiseHighLevelGraph (HLG) layers currently require a collection dependency. This has prevented IO (data-generation) operations from being represeted asBlockwiseobjects. This, in turn, has preventedoptimize_blockwisefrom fusing IO tasks with follow-up block-wise transformation. This is fine if/when we can usefuseon the full task-graph dictionary, but not if we hope to send HLGs directly to the scheduler.This PR makes it possible to perform IO from within a
Blockwiselayer (by passing in a special IO subgraph), and introduces a newBlockwiseParquetLayer. These changes demonstrate thatoptimize_blockwisebehave correctly for IO-enabled layers, without complicatingoptimize_read_parquet_getitem.TODO:
subgraph_callable(may make future optimizations cleaner)Try to avoid IO-subgraph dict materialization when culling and/or getting dependenciesread_csv)from_callableAPI, and remove or update the related test