Skip to content

[Datasets] Support different number of blocks/rows per block in zip().#32795

Merged
ericl merged 3 commits intoray-project:masterfrom
clarkzinzow:datasets/feat/enhance-zip
Feb 24, 2023
Merged

[Datasets] Support different number of blocks/rows per block in zip().#32795
ericl merged 3 commits intoray-project:masterfrom
clarkzinzow:datasets/feat/enhance-zip

Conversation

@clarkzinzow
Copy link
Copy Markdown
Contributor

This PR adds support for a different number of blocks/rows per block in ds1.zip(ds2), by aligning the blocks in ds2 to ds1 with a lightweight repartition/block splitting.

Design

We heavily utilize the block splitting machinery that's use for ds.split() and ds.split_at_indices() to avoid an overly expensive repartition. Namely, for ds1.zip(ds2), we:

  1. Calculate the block sizes for ds1 in order to get split indices.
  2. Apply _split_at_indices() to ds2 in order to get a list of ds2 block chunks for every block in ds1, such that self_block.num_rows() == sum(other_block.num_rows() for other_block in other_split_blocks) for every self_block in ds1.
  3. Zip together each block in ds1 with the one or more blocks from ds2 that constitute the block-aligned split for that ds1 block.

Related issue number

Closes #32375

Checks

  • I've signed off every commit(by using the -s flag, i.e., git commit -s) in this PR.
  • I've run scripts/format.sh to lint the changes in this PR.
  • I've included any doc changes needed for https://docs.ray.io/en/master/.
  • I've made sure the tests are passing. Note that there might be a few flaky tests, see the recent failures at https://flakey-tests.ray.io/
  • Testing Strategy
    • Unit tests
    • Release tests
    • This PR is not tested :(

Copy link
Copy Markdown
Contributor

@ericl ericl left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM at high level

@clarkzinzow
Copy link
Copy Markdown
Contributor Author

Tests look ok, @c21 @jianoaix could one of y'all take a look?

super().__init__("zip", None, do_zip_all)


def _do_zip(block: Block, *other_blocks: Block) -> (Block, BlockMetadata):
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Annotate other_blocks as List[Block] for readability?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Variadic args should have the type annotation of a single one of the arguments, not the collection. https://peps.python.org/pep-0484/#arbitrary-argument-lists-and-default-argument-values

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's List[ObjectRef[Block] from splitting output, any reason this to be a variadic arg?

Copy link
Copy Markdown
Contributor Author

@clarkzinzow clarkzinzow Feb 24, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is used as a Ray task function, and Ray only resolves object refs that are top-level arguments, so we want to have each of these data blocks as top-level arguments to (1) get automatic materialization, (2) ensure the task isn't scheduled until all blocks are resolved, and (3) take advantage of locality-aware scheduling of the task; we wouldn't get any of those 3 things if we did a ray.get() in the task function, if that's what you're recommending.

We use this same pattern elsewhere, whenever we send a variable number of data blocks to a Ray task, we destructure it into a variadic arg so all of the above happens.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok, makes sense!

@c21
Copy link
Copy Markdown
Contributor

c21 commented Feb 24, 2023

btw it would be great if we can create a benchmark for zip() from we learned.

@ericl ericl added the @author-action-required The PR author is responsible for the next step. Remove tag to send back to the reviewer. label Feb 24, 2023
super().__init__("zip", None, do_zip_all)


def _do_zip(block: Block, *other_blocks: Block) -> (Block, BlockMetadata):
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's List[ObjectRef[Block] from splitting output, any reason this to be a variadic arg?

@ericl ericl merged commit e3f875c into ray-project:master Feb 24, 2023
clarkzinzow added a commit to clarkzinzow/ray that referenced this pull request Mar 3, 2023
ray-project#32795)

This PR adds support for a different number of blocks/rows per block in `ds1.zip(ds2)`, by aligning the blocks in `ds2` to `ds1` with a lightweight repartition/block splitting.

## Design

We heavily utilize the block splitting machinery that's use for `ds.split()` and `ds.split_at_indices()` to avoid an overly expensive repartition. Namely, for `ds1.zip(ds2)`, we:
1. Calculate the block sizes for `ds1` in order to get split indices.
2. Apply `_split_at_indices()` to `ds2` in order to get a list of `ds2` block chunks for every block in `ds1`, such that `self_block.num_rows() == sum(other_block.num_rows() for other_block in other_split_blocks)` for every `self_block` in `ds1`.
3. Zip together each block in `ds1` with the one or more blocks from `ds2` that constitute the block-aligned split for that `ds1` block.
zcin pushed a commit that referenced this pull request Mar 3, 2023
#32795) (#32998)

This PR adds support for a different number of blocks/rows per block in `ds1.zip(ds2)`, by aligning the blocks in `ds2` to `ds1` with a lightweight repartition/block splitting.

## Design

We heavily utilize the block splitting machinery that's use for `ds.split()` and `ds.split_at_indices()` to avoid an overly expensive repartition. Namely, for `ds1.zip(ds2)`, we:
1. Calculate the block sizes for `ds1` in order to get split indices.
2. Apply `_split_at_indices()` to `ds2` in order to get a list of `ds2` block chunks for every block in `ds1`, such that `self_block.num_rows() == sum(other_block.num_rows() for other_block in other_split_blocks)` for every `self_block` in `ds1`.
3. Zip together each block in `ds1` with the one or more blocks from `ds2` that constitute the block-aligned split for that `ds1` block.
amogkam added a commit that referenced this pull request Mar 7, 2023
edoakes pushed a commit to edoakes/ray that referenced this pull request Mar 22, 2023
ray-project#32795)

This PR adds support for a different number of blocks/rows per block in `ds1.zip(ds2)`, by aligning the blocks in `ds2` to `ds1` with a lightweight repartition/block splitting.

## Design

We heavily utilize the block splitting machinery that's use for `ds.split()` and `ds.split_at_indices()` to avoid an overly expensive repartition. Namely, for `ds1.zip(ds2)`, we:
1. Calculate the block sizes for `ds1` in order to get split indices.
2. Apply `_split_at_indices()` to `ds2` in order to get a list of `ds2` block chunks for every block in `ds1`, such that `self_block.num_rows() == sum(other_block.num_rows() for other_block in other_split_blocks)` for every `self_block` in `ds1`.
3. Zip together each block in `ds1` with the one or more blocks from `ds2` that constitute the block-aligned split for that `ds1` block.

Signed-off-by: Edward Oakes <ed.nmi.oakes@gmail.com>
peytondmurray pushed a commit to peytondmurray/ray that referenced this pull request Mar 22, 2023
ray-project#32795)

This PR adds support for a different number of blocks/rows per block in `ds1.zip(ds2)`, by aligning the blocks in `ds2` to `ds1` with a lightweight repartition/block splitting.

## Design

We heavily utilize the block splitting machinery that's use for `ds.split()` and `ds.split_at_indices()` to avoid an overly expensive repartition. Namely, for `ds1.zip(ds2)`, we:
1. Calculate the block sizes for `ds1` in order to get split indices.
2. Apply `_split_at_indices()` to `ds2` in order to get a list of `ds2` block chunks for every block in `ds1`, such that `self_block.num_rows() == sum(other_block.num_rows() for other_block in other_split_blocks)` for every `self_block` in `ds1`.
3. Zip together each block in `ds1` with the one or more blocks from `ds2` that constitute the block-aligned split for that `ds1` block.
elliottower pushed a commit to elliottower/ray that referenced this pull request Apr 22, 2023
ray-project#32795)

This PR adds support for a different number of blocks/rows per block in `ds1.zip(ds2)`, by aligning the blocks in `ds2` to `ds1` with a lightweight repartition/block splitting.

## Design

We heavily utilize the block splitting machinery that's use for `ds.split()` and `ds.split_at_indices()` to avoid an overly expensive repartition. Namely, for `ds1.zip(ds2)`, we:
1. Calculate the block sizes for `ds1` in order to get split indices.
2. Apply `_split_at_indices()` to `ds2` in order to get a list of `ds2` block chunks for every block in `ds1`, such that `self_block.num_rows() == sum(other_block.num_rows() for other_block in other_split_blocks)` for every `self_block` in `ds1`.
3. Zip together each block in `ds1` with the one or more blocks from `ds2` that constitute the block-aligned split for that `ds1` block.

Signed-off-by: elliottower <elliot@elliottower.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

@author-action-required The PR author is responsible for the next step. Remove tag to send back to the reviewer.

Projects

None yet

Development

Successfully merging this pull request may close these issues.

[Datasets] Improve user experience of zip()

4 participants