[Data] Add configurable batching for resolve_block_refs to speed up iter_batches#58467
Conversation
Signed-off-by: YoussefEssDS <oyoussefesseddiq@gmail.com>
There was a problem hiding this comment.
Code Review
This pull request introduces batching for resolve_block_refs to improve the performance of iter_batches by reducing the number of ray.get() calls. The batch size is made configurable through a new DataContext setting. The implementation is sound and includes a good test case to verify the batching behavior. I have one suggestion to improve code conciseness by using yield from.
| for block_ref in block_ref_iter: | ||
| pending.append(block_ref) | ||
| if len(pending) >= batch_size: | ||
| for block in _resolve_pending(): | ||
| yield block | ||
|
|
||
| for block in _resolve_pending(): | ||
| yield block |
There was a problem hiding this comment.
The logic for yielding blocks from _resolve_pending is duplicated. You can simplify this by using yield from to make the code more concise and avoid repetition.
| for block_ref in block_ref_iter: | |
| pending.append(block_ref) | |
| if len(pending) >= batch_size: | |
| for block in _resolve_pending(): | |
| yield block | |
| for block in _resolve_pending(): | |
| yield block | |
| for block_ref in block_ref_iter: | |
| pending.append(block_ref) | |
| if len(pending) >= batch_size: | |
| yield from _resolve_pending() | |
| yield from _resolve_pending() |
Signed-off-by: YoussefEssDS <oyoussefesseddiq@gmail.com>
Signed-off-by: YoussefEssDS <oyoussefesseddiq@gmail.com>
Signed-off-by: YoussefEssDS <oyoussefesseddiq@gmail.com>
| if batch_size is None or current_window_size < num_rows_to_prefetch: | ||
| try: | ||
| next_ref_bundle = get_next_ref_bundle() | ||
| next_ref_bundle = next(ref_bundles) |
There was a problem hiding this comment.
Bug: RefBundle Retrieval Observability Gap
The removal of the get_next_ref_bundle() helper function eliminates tracking of stats.iter_get_ref_bundles_s timing metrics. The direct calls to next(ref_bundles) at lines 371 and 384 no longer wrap the operation with the stats timer, causing loss of observability for RefBundle retrieval time which was previously tracked and reported in iteration statistics.
Signed-off-by: YoussefEssDS <oyoussefesseddiq@gmail.com>
There was a problem hiding this comment.
Bug: Phantom Constant Breaks Imports
The constant DEFAULT_ACTOR_MAX_TASKS_IN_FLIGHT_TO_MAX_CONCURRENCY_FACTOR is removed but it's still imported and used in actor_pool_map_operator.py and test_operators.py. This will cause an ImportError when those modules try to import this constant from ray.data.context.
python/ray/data/context.py#L217-L221
ray/python/ray/data/context.py
Lines 217 to 221 in 17d88de
srinathk10
left a comment
There was a problem hiding this comment.
@YoussefEssDS Motivation for the changes look good. Please address review comments.
Also w.r.t your micro-benchmark, please add the results as comment here describing your test setup.
Signed-off-by: YoussefEssDS <oyoussefesseddiq@gmail.com>
| self._eager_free = clear_block_after_read and ctx.eager_free | ||
| max_get_blocks_batch_size = max(1, (prefetch_batches or 0) + 1) | ||
| self._block_get_batch_size = min( | ||
| ctx.iter_get_block_batch_size, max_get_blocks_batch_size |
There was a problem hiding this comment.
Bug: Overly Conservative Batching Limits Performance
The calculation of _block_get_batch_size overly restricts batching by limiting it to prefetch_batches + 1 blocks. With default settings (prefetch_batches=1, iter_get_block_batch_size=32), this results in batching only 2 blocks at a time instead of the configured 32, significantly reducing the performance benefit. The formula max(1, (prefetch_batches or 0) + 1) creates a cap that's too conservative since prefetch_batches measures batches (not blocks), and their relationship varies with block size. This causes the configured iter_get_block_batch_size to be silently overridden in most cases.
There was a problem hiding this comment.
Relaxing that cap breaks the backpressure tests, as it forces the materialization of more blocks than the configured prefetch size.
|
Hi @srinathk10 thanks for the review. I ran the microbenchmark on a ryzen 9 7950X / 64 GB RAM machine (Ubuntu 22.04, Python 3.12) Before batching change: mean 3.82 s (p50 3.83 s, min 3.78 s, max 3.86 s) = 1.31 M rows/s over 4 883 batches. Net improvement ~5% in end-to-end batch iteration throughput with prefetch set to 32. Both runs used the same script and dataset parameters on the same machine. |
Signed-off-by: YoussefEssDS <oyoussefesseddiq@gmail.com>
Signed-off-by: YoussefEssDS <oyoussefesseddiq@gmail.com>
|
Train release tests: https://buildkite.com/ray-project/release/builds/67245 |
| clear_block_after_read and DataContext.get_current().eager_free | ||
| ctx = DataContext.get_current() | ||
| self._eager_free = clear_block_after_read and ctx.eager_free | ||
| max_get_blocks_batch_size = max(1, (prefetch_batches or 0) + 1) |
There was a problem hiding this comment.
prefetch_batches is the number of batches to prefetch, not blocks.
The actual number of blocks to prefetch is calculated in BlockPrefecther.
We can add a method to let it report the number of blocks being prefetched.
| hits += current_hit | ||
| misses += current_miss | ||
| unknowns += current_unknown | ||
| ctx = ray.data.context.DataContext.get_current() |
There was a problem hiding this comment.
pass in the correct context object.
avoid using the global one.
Signed-off-by: YoussefEssDS <oyoussefesseddiq@gmail.com>
|
hi @raulchen is this what you had in mind? any further suggestions? |
|
Hi @raulchen PTAL, Thanks! |
| block_ref_iter: Iterator[ObjectRef[Block]], | ||
| stats: Optional[DatasetStats] = None, | ||
| max_get_batch_size: Optional[Union[int, Callable[[], int]]] = None, | ||
| ctx: Optional["DataContext"] = None, |
There was a problem hiding this comment.
can we make this mandatory?
| self._eager_free = ( | ||
| clear_block_after_read and DataContext.get_current().eager_free | ||
| ) | ||
| self._ctx = DataContext.get_current() |
There was a problem hiding this comment.
ideally this ctx should be passed from Dataset._context.
but since it's an existing issue, you can leave a TODO here if it requires a massive change.
Signed-off-by: YoussefEssDS <oyoussefesseddiq@gmail.com>
|
@raulchen PTAL. thanks! |
|
Hi @raulchen , just bumping this. Can you check if any further changes are needed? Thanks! |
|
Hi @bveeramani can we get this over the line? It's approved by the reviewers. Thanks! |
|
@YoussefEssDS merged. Thank you for the contribution! |
…ter_batches (ray-project#58467) ## Description This PR will: - Batch block resolution in `resolve_block_refs()` so `iter_batches()` issues one `ray.get()` per chunk of block refs instead of per ref. The chunk size is configurable using new `DataContext.iter_get_block_batch_size` knob. - Added a test that proves that `resolve_block_refs()` actually batches the `ray.get()` calls. ## Related issues Raised by @amogkam in `python/ray/data/_internal/block_batching/util.py` ## Additional information Simple benchmark available: https://gist.github.com/YoussefEssDS/40de959a42a19334b8dac8bd217c319b --------- Signed-off-by: YoussefEssDS <oyoussefesseddiq@gmail.com> Signed-off-by: kriyanshii <kriyanshishah06@gmail.com>
…ter_batches (ray-project#58467) ## Description This PR will: - Batch block resolution in `resolve_block_refs()` so `iter_batches()` issues one `ray.get()` per chunk of block refs instead of per ref. The chunk size is configurable using new `DataContext.iter_get_block_batch_size` knob. - Added a test that proves that `resolve_block_refs()` actually batches the `ray.get()` calls. ## Related issues Raised by @amogkam in `python/ray/data/_internal/block_batching/util.py` ## Additional information Simple benchmark available: https://gist.github.com/YoussefEssDS/40de959a42a19334b8dac8bd217c319b --------- Signed-off-by: YoussefEssDS <oyoussefesseddiq@gmail.com>
…ter_batches (ray-project#58467) ## Description This PR will: - Batch block resolution in `resolve_block_refs()` so `iter_batches()` issues one `ray.get()` per chunk of block refs instead of per ref. The chunk size is configurable using new `DataContext.iter_get_block_batch_size` knob. - Added a test that proves that `resolve_block_refs()` actually batches the `ray.get()` calls. ## Related issues Raised by @amogkam in `python/ray/data/_internal/block_batching/util.py` ## Additional information Simple benchmark available: https://gist.github.com/YoussefEssDS/40de959a42a19334b8dac8bd217c319b --------- Signed-off-by: YoussefEssDS <oyoussefesseddiq@gmail.com>
…eed up iter_batches (ray-project#58467)" This reverts commit 2a042d4.
…eed up iter_batches (ray-project#58467)" This reverts commit 2a042d4. Signed-off-by: iamjustinhsu <jhsu@anyscale.com>
ray-project#60114) …eed up iter_batches (ray-project#58467)" This reverts commit 2a042d4. ## Description Reverts # 58467 ## Related issues ## Additional information Signed-off-by: iamjustinhsu <jhsu@anyscale.com>
ray-project#60114) …eed up iter_batches (ray-project#58467)" This reverts commit 2a042d4. ## Description Reverts # 58467 ## Related issues ## Additional information Signed-off-by: iamjustinhsu <jhsu@anyscale.com> Signed-off-by: jeffery4011 <jefferyshen1015@gmail.com>
ray-project#60114) …eed up iter_batches (ray-project#58467)" This reverts commit 2a042d4. ## Description Reverts # 58467 ## Related issues ## Additional information Signed-off-by: iamjustinhsu <jhsu@anyscale.com>
…ter_batches (ray-project#58467) ## Description This PR will: - Batch block resolution in `resolve_block_refs()` so `iter_batches()` issues one `ray.get()` per chunk of block refs instead of per ref. The chunk size is configurable using new `DataContext.iter_get_block_batch_size` knob. - Added a test that proves that `resolve_block_refs()` actually batches the `ray.get()` calls. ## Related issues Raised by @amogkam in `python/ray/data/_internal/block_batching/util.py` ## Additional information Simple benchmark available: https://gist.github.com/YoussefEssDS/40de959a42a19334b8dac8bd217c319b --------- Signed-off-by: YoussefEssDS <oyoussefesseddiq@gmail.com> Signed-off-by: peterxcli <peterxcli@gmail.com>
ray-project#60114) …eed up iter_batches (ray-project#58467)" This reverts commit 2a042d4. ## Description Reverts # 58467 ## Related issues ## Additional information Signed-off-by: iamjustinhsu <jhsu@anyscale.com> Signed-off-by: peterxcli <peterxcli@gmail.com>
ray-project#60114) …eed up iter_batches (ray-project#58467)" This reverts commit 2a042d4. ## Description Reverts # 58467 ## Related issues ## Additional information Signed-off-by: iamjustinhsu <jhsu@anyscale.com> Signed-off-by: peterxcli <peterxcli@gmail.com>
Description
This PR will:
resolve_block_refs()soiter_batches()issues oneray.get()per chunk of block refs instead of per ref. The chunk size is configurable using newDataContext.iter_get_block_batch_sizeknob.resolve_block_refs()actually batches theray.get()calls.Related issues
Raised by @amogkam in
python/ray/data/_internal/block_batching/util.pyAdditional information
Simple benchmark available: https://gist.github.com/YoussefEssDS/40de959a42a19334b8dac8bd217c319b