Use Blockwise for DataFrame IO (parquet, csv, and orc)#7415
Use Blockwise for DataFrame IO (parquet, csv, and orc)#7415jrbourbeau merged 90 commits intodask:mainfrom
Conversation
…shuffle-avoid-pd-import
Co-authored-by: James Bourbeau <jrbourbeau@users.noreply.github.com>
…ask_distributed_unpack__
Co-authored-by: James Bourbeau <jrbourbeau@users.noreply.github.com>
…kwise-io-dataframe
|
@gjoseph92 @ian-r-rose @jrbourbeau - Thank you for all the help with Blockwise-IO stuff so far! I am hoping to get this merged early in the current release cycle (if possible). In fact, I'd like to follow up with a quick PR to set the "optimize.fuse.active" default to False for Dataframe. Let me know if you think this plan is too aggressive :) |
gjoseph92
left a comment
There was a problem hiding this comment.
This looks really good to me. I'd be comfortable with seeing it merged right now; at this point, I just have a couple little nits.
The answer to this could definitely be "let's do it later", but what do you think about adding a little more documentation about the expectations and invariants in this BlockwiseDep system?
- Docstrings for base
BlockwiseDepmethods saying what they will be called with and are expected to return - Example of
io_depsbehavior inmake_blockwise_graphdocstring
Could be nice to get it out of the way now, so it's easy to reference for all of us as we add to the Blockwise system and start forgetting about the implementation details.
ian-r-rose
left a comment
There was a problem hiding this comment.
This is looking good @rjzamora. I have a few comments, mostly around documentation/clarification
| # in `self.indices` throughout `Blockwise`. | ||
| self.indices = [] | ||
| self.numblocks = numblocks | ||
| self.io_deps = io_deps or {} |
There was a problem hiding this comment.
Does it ever make sense to have some io_deps specified via indices, and others via io_deps? Or do we view this mostly as a bridge to removing io_deps?
There was a problem hiding this comment.
Or do we view this mostly as a bridge to removing io_deps?
Right - This is mostly a bridge. I don't think it makes sense for io_deps to be populated if there are BlockwiseDep objects in indices - But I haven't really though carefully about more-complex workflows with fusion, etc.
jrbourbeau
left a comment
There was a problem hiding this comment.
Thanks for all your work on this @rjzamora and @gjoseph92 @ian-r-rose for the detailed review
|
Awesome work @rjzamora! |
|
Thanks @gjoseph92 @ian-r-rose @jrbourbeau - This PR would have been much uglier without all your help :) |
Superceeds #7042
Blocked by #7381Blockwiseto require the elements ofio_depsto inherit from theBlockwiseDepclass (previously calledBlockwiseIODeps)io_depsto be passed in via theindicesargument (asBlockwiseDepobjects) instead. These non-string objects are currently stripped out and added toself.io_deps. However, there is no reason that we cannot take the approach suggested in [Idea] Alternative API for Blockwise io_deps #7513 in a follow-up PR, and removeself.io_depsaltogether.dask.layers.DataFrameLayeranddask.layers.DataFrameIOLayerto simplify the construction ofBlockwise-based DataFrame layers for IO.DataFrameLayergeneralizes the existingread_parquet/getitemoptimization. Thegetitemoptimization now works forread_parquet,read_orcandread_csv(and can be extended to other IO operations that can target a subset of columns).DataFrameIOLayeris to avoid code duplicationread_parquetto useBlockwise.large_graph_objects=option (default False) has been added toread_parquet. However, due to serialization improvements in other PRs, this change can be rolled back if necessary.read_orcto useBlockwise.read_csvto useBlockwise.