[Datasets] Support different number of blocks/rows per block in zip().#32795
[Datasets] Support different number of blocks/rows per block in zip().#32795ericl merged 3 commits intoray-project:masterfrom
Conversation
| super().__init__("zip", None, do_zip_all) | ||
|
|
||
|
|
||
| def _do_zip(block: Block, *other_blocks: Block) -> (Block, BlockMetadata): |
There was a problem hiding this comment.
Annotate other_blocks as List[Block] for readability?
There was a problem hiding this comment.
Variadic args should have the type annotation of a single one of the arguments, not the collection. https://peps.python.org/pep-0484/#arbitrary-argument-lists-and-default-argument-values
There was a problem hiding this comment.
It's List[ObjectRef[Block] from splitting output, any reason this to be a variadic arg?
There was a problem hiding this comment.
This is used as a Ray task function, and Ray only resolves object refs that are top-level arguments, so we want to have each of these data blocks as top-level arguments to (1) get automatic materialization, (2) ensure the task isn't scheduled until all blocks are resolved, and (3) take advantage of locality-aware scheduling of the task; we wouldn't get any of those 3 things if we did a ray.get() in the task function, if that's what you're recommending.
We use this same pattern elsewhere, whenever we send a variable number of data blocks to a Ray task, we destructure it into a variadic arg so all of the above happens.
|
btw it would be great if we can create a benchmark for zip() from we learned. |
| super().__init__("zip", None, do_zip_all) | ||
|
|
||
|
|
||
| def _do_zip(block: Block, *other_blocks: Block) -> (Block, BlockMetadata): |
There was a problem hiding this comment.
It's List[ObjectRef[Block] from splitting output, any reason this to be a variadic arg?
ray-project#32795) This PR adds support for a different number of blocks/rows per block in `ds1.zip(ds2)`, by aligning the blocks in `ds2` to `ds1` with a lightweight repartition/block splitting. ## Design We heavily utilize the block splitting machinery that's use for `ds.split()` and `ds.split_at_indices()` to avoid an overly expensive repartition. Namely, for `ds1.zip(ds2)`, we: 1. Calculate the block sizes for `ds1` in order to get split indices. 2. Apply `_split_at_indices()` to `ds2` in order to get a list of `ds2` block chunks for every block in `ds1`, such that `self_block.num_rows() == sum(other_block.num_rows() for other_block in other_split_blocks)` for every `self_block` in `ds1`. 3. Zip together each block in `ds1` with the one or more blocks from `ds2` that constitute the block-aligned split for that `ds1` block.
#32795) (#32998) This PR adds support for a different number of blocks/rows per block in `ds1.zip(ds2)`, by aligning the blocks in `ds2` to `ds1` with a lightweight repartition/block splitting. ## Design We heavily utilize the block splitting machinery that's use for `ds.split()` and `ds.split_at_indices()` to avoid an overly expensive repartition. Namely, for `ds1.zip(ds2)`, we: 1. Calculate the block sizes for `ds1` in order to get split indices. 2. Apply `_split_at_indices()` to `ds2` in order to get a list of `ds2` block chunks for every block in `ds1`, such that `self_block.num_rows() == sum(other_block.num_rows() for other_block in other_split_blocks)` for every `self_block` in `ds1`. 3. Zip together each block in `ds1` with the one or more blocks from `ds2` that constitute the block-aligned split for that `ds1` block.
ray-project#32795) This PR adds support for a different number of blocks/rows per block in `ds1.zip(ds2)`, by aligning the blocks in `ds2` to `ds1` with a lightweight repartition/block splitting. ## Design We heavily utilize the block splitting machinery that's use for `ds.split()` and `ds.split_at_indices()` to avoid an overly expensive repartition. Namely, for `ds1.zip(ds2)`, we: 1. Calculate the block sizes for `ds1` in order to get split indices. 2. Apply `_split_at_indices()` to `ds2` in order to get a list of `ds2` block chunks for every block in `ds1`, such that `self_block.num_rows() == sum(other_block.num_rows() for other_block in other_split_blocks)` for every `self_block` in `ds1`. 3. Zip together each block in `ds1` with the one or more blocks from `ds2` that constitute the block-aligned split for that `ds1` block. Signed-off-by: Edward Oakes <ed.nmi.oakes@gmail.com>
ray-project#32795) This PR adds support for a different number of blocks/rows per block in `ds1.zip(ds2)`, by aligning the blocks in `ds2` to `ds1` with a lightweight repartition/block splitting. ## Design We heavily utilize the block splitting machinery that's use for `ds.split()` and `ds.split_at_indices()` to avoid an overly expensive repartition. Namely, for `ds1.zip(ds2)`, we: 1. Calculate the block sizes for `ds1` in order to get split indices. 2. Apply `_split_at_indices()` to `ds2` in order to get a list of `ds2` block chunks for every block in `ds1`, such that `self_block.num_rows() == sum(other_block.num_rows() for other_block in other_split_blocks)` for every `self_block` in `ds1`. 3. Zip together each block in `ds1` with the one or more blocks from `ds2` that constitute the block-aligned split for that `ds1` block.
ray-project#32795) This PR adds support for a different number of blocks/rows per block in `ds1.zip(ds2)`, by aligning the blocks in `ds2` to `ds1` with a lightweight repartition/block splitting. ## Design We heavily utilize the block splitting machinery that's use for `ds.split()` and `ds.split_at_indices()` to avoid an overly expensive repartition. Namely, for `ds1.zip(ds2)`, we: 1. Calculate the block sizes for `ds1` in order to get split indices. 2. Apply `_split_at_indices()` to `ds2` in order to get a list of `ds2` block chunks for every block in `ds1`, such that `self_block.num_rows() == sum(other_block.num_rows() for other_block in other_split_blocks)` for every `self_block` in `ds1`. 3. Zip together each block in `ds1` with the one or more blocks from `ds2` that constitute the block-aligned split for that `ds1` block. Signed-off-by: elliottower <elliot@elliottower.com>
This PR adds support for a different number of blocks/rows per block in
ds1.zip(ds2), by aligning the blocks inds2tods1with a lightweight repartition/block splitting.Design
We heavily utilize the block splitting machinery that's use for
ds.split()andds.split_at_indices()to avoid an overly expensive repartition. Namely, fords1.zip(ds2), we:ds1in order to get split indices._split_at_indices()tods2in order to get a list ofds2block chunks for every block inds1, such thatself_block.num_rows() == sum(other_block.num_rows() for other_block in other_split_blocks)for everyself_blockinds1.ds1with the one or more blocks fromds2that constitute the block-aligned split for thatds1block.Related issue number
Closes #32375
Checks
git commit -s) in this PR.scripts/format.shto lint the changes in this PR.