[data] Fix StreamingRepartition hang with empty upstream results#59848
[data] Fix StreamingRepartition hang with empty upstream results#59848bveeramani merged 1 commit intoray-project:masterfrom
Conversation
There was a problem hiding this comment.
Code Review
This pull request fixes a critical bug where StreamingRepartitionRefBundler would hang when processing empty datasets. The fix involves adding an early return in add_bundle for empty bundles. This correctly resolves the hang. A new test case is also added to verify the fix, which is comprehensive.
My main feedback is that while the fix is effective for the hang, it introduces a metrics inconsistency by dropping empty bundles that have already been queued for metrics. I've left a comment with a suggestion for a more robust fix that would also keep the metrics accurate.
When upstream produces empty results (0 rows), empty bundles get stuck in _pending_bundles and never flushed. Signed-off-by: dragongu <andrewgu@vip.qq.com>
d51a386 to
3082ce5
Compare
|
@dragongu i'm making a huge refactor that includes revamping the streaming ref bundler here: #59093. I ran the reproduction script and it works on that PR. I'm ok with merging this now because my PR is not merging in the near near future |
…-project#59848) # Fix StreamingRepartition hang with empty upstream results ## Summary Fix a bug where `StreamingRepartitionRefBundler` would hang when processing empty datasets (0 rows). ## Problem When upstream operations (e.g., `filter`, `map`, etc.) produce an empty result (0 rows), resulting empty `RefBundle` gets added to `_pending_bundles` but never gets flushed because: 1. `add_bundle()` adds empty bundles (0 rows) to `_pending_bundles` 2. `_total_pending_rows` remains 0 3. `done_adding_bundles()` checks `len(_pending_bundles) > 0` and calls `flush_remaining=True` 4. `_try_build_ready_bundle(flush_remaining=True)` checks `_total_pending_rows > 0` → False, so no flush happens 5. Empty bundles remain in `_pending_bundles` forever (memory leak) ## Reproduction ```python import ray ray.init() ds = ray.data.range(5).filter(lambda row: row['id'] > 100) ds = ds.repartition(target_num_rows_per_block=8) ds.count() ``` ## Solution Changed flush condition in `_try_build_ready_bundle()` from checking `_total_pending_rows > 0` to `len(self._pending_bundles) > 0`: ```python # Before: if flush_remaining and self._total_pending_rows > 0: # After: if flush_remaining and len(self._pending_bundles) > 0: ``` This ensures empty bundles never enter the bundler state, preventing both hangs and memory leaks. Signed-off-by: dragongu <andrewgu@vip.qq.com> Signed-off-by: jasonwrwang <jasonwrwang@tencent.com>
…-project#59848) # Fix StreamingRepartition hang with empty upstream results ## Summary Fix a bug where `StreamingRepartitionRefBundler` would hang when processing empty datasets (0 rows). ## Problem When upstream operations (e.g., `filter`, `map`, etc.) produce an empty result (0 rows), resulting empty `RefBundle` gets added to `_pending_bundles` but never gets flushed because: 1. `add_bundle()` adds empty bundles (0 rows) to `_pending_bundles` 2. `_total_pending_rows` remains 0 3. `done_adding_bundles()` checks `len(_pending_bundles) > 0` and calls `flush_remaining=True` 4. `_try_build_ready_bundle(flush_remaining=True)` checks `_total_pending_rows > 0` → False, so no flush happens 5. Empty bundles remain in `_pending_bundles` forever (memory leak) ## Reproduction ```python import ray ray.init() ds = ray.data.range(5).filter(lambda row: row['id'] > 100) ds = ds.repartition(target_num_rows_per_block=8) ds.count() ``` ## Solution Changed flush condition in `_try_build_ready_bundle()` from checking `_total_pending_rows > 0` to `len(self._pending_bundles) > 0`: ```python # Before: if flush_remaining and self._total_pending_rows > 0: # After: if flush_remaining and len(self._pending_bundles) > 0: ``` This ensures empty bundles never enter the bundler state, preventing both hangs and memory leaks. Signed-off-by: dragongu <andrewgu@vip.qq.com>
…-project#59848) # Fix StreamingRepartition hang with empty upstream results ## Summary Fix a bug where `StreamingRepartitionRefBundler` would hang when processing empty datasets (0 rows). ## Problem When upstream operations (e.g., `filter`, `map`, etc.) produce an empty result (0 rows), resulting empty `RefBundle` gets added to `_pending_bundles` but never gets flushed because: 1. `add_bundle()` adds empty bundles (0 rows) to `_pending_bundles` 2. `_total_pending_rows` remains 0 3. `done_adding_bundles()` checks `len(_pending_bundles) > 0` and calls `flush_remaining=True` 4. `_try_build_ready_bundle(flush_remaining=True)` checks `_total_pending_rows > 0` → False, so no flush happens 5. Empty bundles remain in `_pending_bundles` forever (memory leak) ## Reproduction ```python import ray ray.init() ds = ray.data.range(5).filter(lambda row: row['id'] > 100) ds = ds.repartition(target_num_rows_per_block=8) ds.count() ``` ## Solution Changed flush condition in `_try_build_ready_bundle()` from checking `_total_pending_rows > 0` to `len(self._pending_bundles) > 0`: ```python # Before: if flush_remaining and self._total_pending_rows > 0: # After: if flush_remaining and len(self._pending_bundles) > 0: ``` This ensures empty bundles never enter the bundler state, preventing both hangs and memory leaks. Signed-off-by: dragongu <andrewgu@vip.qq.com>
…-project#59848) # Fix StreamingRepartition hang with empty upstream results ## Summary Fix a bug where `StreamingRepartitionRefBundler` would hang when processing empty datasets (0 rows). ## Problem When upstream operations (e.g., `filter`, `map`, etc.) produce an empty result (0 rows), resulting empty `RefBundle` gets added to `_pending_bundles` but never gets flushed because: 1. `add_bundle()` adds empty bundles (0 rows) to `_pending_bundles` 2. `_total_pending_rows` remains 0 3. `done_adding_bundles()` checks `len(_pending_bundles) > 0` and calls `flush_remaining=True` 4. `_try_build_ready_bundle(flush_remaining=True)` checks `_total_pending_rows > 0` → False, so no flush happens 5. Empty bundles remain in `_pending_bundles` forever (memory leak) ## Reproduction ```python import ray ray.init() ds = ray.data.range(5).filter(lambda row: row['id'] > 100) ds = ds.repartition(target_num_rows_per_block=8) ds.count() ``` ## Solution Changed flush condition in `_try_build_ready_bundle()` from checking `_total_pending_rows > 0` to `len(self._pending_bundles) > 0`: ```python # Before: if flush_remaining and self._total_pending_rows > 0: # After: if flush_remaining and len(self._pending_bundles) > 0: ``` This ensures empty bundles never enter the bundler state, preventing both hangs and memory leaks. Signed-off-by: dragongu <andrewgu@vip.qq.com> Signed-off-by: peterxcli <peterxcli@gmail.com>
…-project#59848) # Fix StreamingRepartition hang with empty upstream results ## Summary Fix a bug where `StreamingRepartitionRefBundler` would hang when processing empty datasets (0 rows). ## Problem When upstream operations (e.g., `filter`, `map`, etc.) produce an empty result (0 rows), resulting empty `RefBundle` gets added to `_pending_bundles` but never gets flushed because: 1. `add_bundle()` adds empty bundles (0 rows) to `_pending_bundles` 2. `_total_pending_rows` remains 0 3. `done_adding_bundles()` checks `len(_pending_bundles) > 0` and calls `flush_remaining=True` 4. `_try_build_ready_bundle(flush_remaining=True)` checks `_total_pending_rows > 0` → False, so no flush happens 5. Empty bundles remain in `_pending_bundles` forever (memory leak) ## Reproduction ```python import ray ray.init() ds = ray.data.range(5).filter(lambda row: row['id'] > 100) ds = ds.repartition(target_num_rows_per_block=8) ds.count() ``` ## Solution Changed flush condition in `_try_build_ready_bundle()` from checking `_total_pending_rows > 0` to `len(self._pending_bundles) > 0`: ```python # Before: if flush_remaining and self._total_pending_rows > 0: # After: if flush_remaining and len(self._pending_bundles) > 0: ``` This ensures empty bundles never enter the bundler state, preventing both hangs and memory leaks. Signed-off-by: dragongu <andrewgu@vip.qq.com> Signed-off-by: peterxcli <peterxcli@gmail.com>
Fix StreamingRepartition hang with empty upstream results
Summary
Fix a bug where
StreamingRepartitionRefBundlerwould hang when processing empty datasets (0 rows).Problem
When upstream operations (e.g.,
filter,map, etc.) produce an empty result (0 rows), resulting emptyRefBundlegets added to_pending_bundlesbut never gets flushed because:add_bundle()adds empty bundles (0 rows) to_pending_bundles_total_pending_rowsremains 0done_adding_bundles()checkslen(_pending_bundles) > 0and callsflush_remaining=True_try_build_ready_bundle(flush_remaining=True)checks_total_pending_rows > 0→ False, so no flush happens_pending_bundlesforever (memory leak)Reproduction
Solution
Changed flush condition in
_try_build_ready_bundle()from checking_total_pending_rows > 0tolen(self._pending_bundles) > 0:This ensures empty bundles never enter the bundler state, preventing both hangs and memory leaks.