Skip to content

Use Blockwise for DataFrame IO (parquet, csv, and orc)#7415

Merged
jrbourbeau merged 90 commits intodask:mainfrom
rjzamora:blockwise-io-dataframe
Apr 29, 2021
Merged

Use Blockwise for DataFrame IO (parquet, csv, and orc)#7415
jrbourbeau merged 90 commits intodask:mainfrom
rjzamora:blockwise-io-dataframe

Conversation

@rjzamora
Copy link
Member

@rjzamora rjzamora commented Mar 18, 2021

Superceeds #7042
Blocked by #7381

  • Changes Blockwise to require the elements of io_deps to inherit from the BlockwiseDep class (previously called BlockwiseIODeps)
  • Allows elements of io_deps to be passed in via the indices argument (as BlockwiseDep objects) instead. These non-string objects are currently stripped out and added to self.io_deps. However, there is no reason that we cannot take the approach suggested in [Idea] Alternative API for Blockwise io_deps #7513 in a follow-up PR, and remove self.io_deps altogether.
  • Introduces dask.layers.DataFrameLayer and dask.layers.DataFrameIOLayer to simplify the construction of Blockwise-based DataFrame layers for IO.
    • Introducing DataFrameLayer generalizes the existing read_parquet/getitem optimization. The getitem optimization now works for read_parquet, read_orc and read_csv (and can be extended to other IO operations that can target a subset of columns).
    • The purpose of DataFrameIOLayer is to avoid code duplication
  • Refactors read_parquet to use Blockwise.
    • A new large_graph_objects= option (default False) has been added to read_parquet. However, due to serialization improvements in other PRs, this change can be rolled back if necessary.
  • Refactors read_orc to use Blockwise.
  • Refactors read_csv to use Blockwise.

jrbourbeau and others added 30 commits March 11, 2021 19:16
Co-authored-by: James Bourbeau <jrbourbeau@users.noreply.github.com>
Co-authored-by: James Bourbeau <jrbourbeau@users.noreply.github.com>
@rjzamora rjzamora marked this pull request as ready for review April 23, 2021 13:13
@rjzamora
Copy link
Member Author

@gjoseph92 @ian-r-rose @jrbourbeau - Thank you for all the help with Blockwise-IO stuff so far! I am hoping to get this merged early in the current release cycle (if possible). In fact, I'd like to follow up with a quick PR to set the "optimize.fuse.active" default to False for Dataframe. Let me know if you think this plan is too aggressive :)

Copy link
Collaborator

@gjoseph92 gjoseph92 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks really good to me. I'd be comfortable with seeing it merged right now; at this point, I just have a couple little nits.

The answer to this could definitely be "let's do it later", but what do you think about adding a little more documentation about the expectations and invariants in this BlockwiseDep system?

  • Docstrings for base BlockwiseDep methods saying what they will be called with and are expected to return
  • Example of io_deps behavior in make_blockwise_graph docstring

Could be nice to get it out of the way now, so it's easy to reference for all of us as we add to the Blockwise system and start forgetting about the implementation details.

Copy link
Collaborator

@ian-r-rose ian-r-rose left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is looking good @rjzamora. I have a few comments, mostly around documentation/clarification

# in `self.indices` throughout `Blockwise`.
self.indices = []
self.numblocks = numblocks
self.io_deps = io_deps or {}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does it ever make sense to have some io_deps specified via indices, and others via io_deps? Or do we view this mostly as a bridge to removing io_deps?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Or do we view this mostly as a bridge to removing io_deps?

Right - This is mostly a bridge. I don't think it makes sense for io_deps to be populated if there are BlockwiseDep objects in indices - But I haven't really though carefully about more-complex workflows with fusion, etc.

Copy link
Member

@jrbourbeau jrbourbeau left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for all your work on this @rjzamora and @gjoseph92 @ian-r-rose for the detailed review

@jrbourbeau jrbourbeau merged commit 7893616 into dask:main Apr 29, 2021
@gjoseph92
Copy link
Collaborator

Awesome work @rjzamora!

@rjzamora
Copy link
Member Author

Thanks @gjoseph92 @ian-r-rose @jrbourbeau - This PR would have been much uglier without all your help :)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants