Skip to content

[data] Fix StreamingRepartition hang with empty upstream results#59848

Merged
bveeramani merged 1 commit intoray-project:masterfrom
dragongu:fix/repartition
Jan 6, 2026
Merged

[data] Fix StreamingRepartition hang with empty upstream results#59848
bveeramani merged 1 commit intoray-project:masterfrom
dragongu:fix/repartition

Conversation

@dragongu
Copy link
Copy Markdown
Contributor

@dragongu dragongu commented Jan 5, 2026

Fix StreamingRepartition hang with empty upstream results

Summary

Fix a bug where StreamingRepartitionRefBundler would hang when processing empty datasets (0 rows).

Problem

When upstream operations (e.g., filter, map, etc.) produce an empty result (0 rows), resulting empty RefBundle gets added to _pending_bundles but never gets flushed because:

  1. add_bundle() adds empty bundles (0 rows) to _pending_bundles
  2. _total_pending_rows remains 0
  3. done_adding_bundles() checks len(_pending_bundles) > 0 and calls flush_remaining=True
  4. _try_build_ready_bundle(flush_remaining=True) checks _total_pending_rows > 0 → False, so no flush happens
  5. Empty bundles remain in _pending_bundles forever (memory leak)

Reproduction

import ray
ray.init()
ds = ray.data.range(5).filter(lambda row: row['id'] > 100)
ds = ds.repartition(target_num_rows_per_block=8)
ds.count()

Solution

Changed flush condition in _try_build_ready_bundle() from checking _total_pending_rows > 0 to len(self._pending_bundles) > 0:

# Before:
if flush_remaining and self._total_pending_rows > 0:

# After:
if flush_remaining and len(self._pending_bundles) > 0:

This ensures empty bundles never enter the bundler state, preventing both hangs and memory leaks.

@dragongu dragongu requested a review from a team as a code owner January 5, 2026 11:12
Copy link
Copy Markdown
Contributor

@gemini-code-assist gemini-code-assist bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request fixes a critical bug where StreamingRepartitionRefBundler would hang when processing empty datasets. The fix involves adding an early return in add_bundle for empty bundles. This correctly resolves the hang. A new test case is also added to verify the fix, which is comprehensive.

My main feedback is that while the fix is effective for the hang, it introduces a metrics inconsistency by dropping empty bundles that have already been queued for metrics. I've left a comment with a suggestion for a more robust fix that would also keep the metrics accurate.

When upstream produces empty results (0 rows), empty bundles get stuck in
_pending_bundles and never flushed.

Signed-off-by: dragongu <andrewgu@vip.qq.com>
@ray-gardener ray-gardener bot added data Ray Data-related issues community-contribution Contributed by the community labels Jan 5, 2026
Copy link
Copy Markdown
Member

@bveeramani bveeramani left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@iamjustinhsu
Copy link
Copy Markdown
Contributor

@dragongu i'm making a huge refactor that includes revamping the streaming ref bundler here: #59093. I ran the reproduction script and it works on that PR. I'm ok with merging this now because my PR is not merging in the near near future
cc: @owenowenisme

Copy link
Copy Markdown
Member

@owenowenisme owenowenisme left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@owenowenisme owenowenisme added the go add ONLY when ready to merge, run all tests label Jan 6, 2026
@bveeramani bveeramani enabled auto-merge (squash) January 6, 2026 07:12
@bveeramani bveeramani merged commit fc78704 into ray-project:master Jan 6, 2026
8 checks passed
AYou0207 pushed a commit to AYou0207/ray that referenced this pull request Jan 13, 2026
…-project#59848)

# Fix StreamingRepartition hang with empty upstream results

## Summary
Fix a bug where `StreamingRepartitionRefBundler` would hang when
processing empty datasets (0 rows).

## Problem
When upstream operations (e.g., `filter`, `map`, etc.) produce an empty
result (0 rows), resulting empty `RefBundle` gets added to
`_pending_bundles` but never gets flushed because:
1. `add_bundle()` adds empty bundles (0 rows) to `_pending_bundles`
2. `_total_pending_rows` remains 0
3. `done_adding_bundles()` checks `len(_pending_bundles) > 0` and calls
`flush_remaining=True`
4. `_try_build_ready_bundle(flush_remaining=True)` checks
`_total_pending_rows > 0` → False, so no flush happens
5. Empty bundles remain in `_pending_bundles` forever (memory leak)

## Reproduction
```python
import ray
ray.init()
ds = ray.data.range(5).filter(lambda row: row['id'] > 100)
ds = ds.repartition(target_num_rows_per_block=8)
ds.count()
```

## Solution
Changed flush condition in `_try_build_ready_bundle()` from checking
`_total_pending_rows > 0` to `len(self._pending_bundles) > 0`:

```python
# Before:
if flush_remaining and self._total_pending_rows > 0:

# After:
if flush_remaining and len(self._pending_bundles) > 0:
```

This ensures empty bundles never enter the bundler state, preventing
both hangs and memory leaks.

Signed-off-by: dragongu <andrewgu@vip.qq.com>
Signed-off-by: jasonwrwang <jasonwrwang@tencent.com>
lee1258561 pushed a commit to pinterest/ray that referenced this pull request Feb 3, 2026
…-project#59848)

# Fix StreamingRepartition hang with empty upstream results

## Summary
Fix a bug where `StreamingRepartitionRefBundler` would hang when
processing empty datasets (0 rows).

## Problem
When upstream operations (e.g., `filter`, `map`, etc.) produce an empty
result (0 rows), resulting empty `RefBundle` gets added to
`_pending_bundles` but never gets flushed because:
1. `add_bundle()` adds empty bundles (0 rows) to `_pending_bundles`
2. `_total_pending_rows` remains 0
3. `done_adding_bundles()` checks `len(_pending_bundles) > 0` and calls
`flush_remaining=True`
4. `_try_build_ready_bundle(flush_remaining=True)` checks
`_total_pending_rows > 0` → False, so no flush happens
5. Empty bundles remain in `_pending_bundles` forever (memory leak)

## Reproduction
```python
import ray
ray.init()
ds = ray.data.range(5).filter(lambda row: row['id'] > 100)
ds = ds.repartition(target_num_rows_per_block=8)
ds.count()
```

## Solution
Changed flush condition in `_try_build_ready_bundle()` from checking
`_total_pending_rows > 0` to `len(self._pending_bundles) > 0`:

```python
# Before:
if flush_remaining and self._total_pending_rows > 0:

# After:
if flush_remaining and len(self._pending_bundles) > 0:
```

This ensures empty bundles never enter the bundler state, preventing
both hangs and memory leaks.

Signed-off-by: dragongu <andrewgu@vip.qq.com>
ryanaoleary pushed a commit to ryanaoleary/ray that referenced this pull request Feb 3, 2026
…-project#59848)

# Fix StreamingRepartition hang with empty upstream results

## Summary
Fix a bug where `StreamingRepartitionRefBundler` would hang when
processing empty datasets (0 rows).

## Problem
When upstream operations (e.g., `filter`, `map`, etc.) produce an empty
result (0 rows), resulting empty `RefBundle` gets added to
`_pending_bundles` but never gets flushed because:
1. `add_bundle()` adds empty bundles (0 rows) to `_pending_bundles`
2. `_total_pending_rows` remains 0
3. `done_adding_bundles()` checks `len(_pending_bundles) > 0` and calls
`flush_remaining=True`
4. `_try_build_ready_bundle(flush_remaining=True)` checks
`_total_pending_rows > 0` → False, so no flush happens
5. Empty bundles remain in `_pending_bundles` forever (memory leak)

## Reproduction
```python
import ray
ray.init()
ds = ray.data.range(5).filter(lambda row: row['id'] > 100)
ds = ds.repartition(target_num_rows_per_block=8)
ds.count()
```

## Solution
Changed flush condition in `_try_build_ready_bundle()` from checking
`_total_pending_rows > 0` to `len(self._pending_bundles) > 0`:

```python
# Before:
if flush_remaining and self._total_pending_rows > 0:

# After:
if flush_remaining and len(self._pending_bundles) > 0:
```

This ensures empty bundles never enter the bundler state, preventing
both hangs and memory leaks.

Signed-off-by: dragongu <andrewgu@vip.qq.com>
peterxcli pushed a commit to peterxcli/ray that referenced this pull request Feb 25, 2026
…-project#59848)

# Fix StreamingRepartition hang with empty upstream results

## Summary
Fix a bug where `StreamingRepartitionRefBundler` would hang when
processing empty datasets (0 rows).

## Problem
When upstream operations (e.g., `filter`, `map`, etc.) produce an empty
result (0 rows), resulting empty `RefBundle` gets added to
`_pending_bundles` but never gets flushed because:
1. `add_bundle()` adds empty bundles (0 rows) to `_pending_bundles`
2. `_total_pending_rows` remains 0
3. `done_adding_bundles()` checks `len(_pending_bundles) > 0` and calls
`flush_remaining=True`
4. `_try_build_ready_bundle(flush_remaining=True)` checks
`_total_pending_rows > 0` → False, so no flush happens
5. Empty bundles remain in `_pending_bundles` forever (memory leak)

## Reproduction
```python
import ray
ray.init()
ds = ray.data.range(5).filter(lambda row: row['id'] > 100)
ds = ds.repartition(target_num_rows_per_block=8)
ds.count()
```

## Solution
Changed flush condition in `_try_build_ready_bundle()` from checking
`_total_pending_rows > 0` to `len(self._pending_bundles) > 0`:

```python
# Before:
if flush_remaining and self._total_pending_rows > 0:

# After:
if flush_remaining and len(self._pending_bundles) > 0:
```

This ensures empty bundles never enter the bundler state, preventing
both hangs and memory leaks.

Signed-off-by: dragongu <andrewgu@vip.qq.com>
Signed-off-by: peterxcli <peterxcli@gmail.com>
peterxcli pushed a commit to peterxcli/ray that referenced this pull request Feb 25, 2026
…-project#59848)

# Fix StreamingRepartition hang with empty upstream results

## Summary
Fix a bug where `StreamingRepartitionRefBundler` would hang when
processing empty datasets (0 rows).

## Problem
When upstream operations (e.g., `filter`, `map`, etc.) produce an empty
result (0 rows), resulting empty `RefBundle` gets added to
`_pending_bundles` but never gets flushed because:
1. `add_bundle()` adds empty bundles (0 rows) to `_pending_bundles`
2. `_total_pending_rows` remains 0
3. `done_adding_bundles()` checks `len(_pending_bundles) > 0` and calls
`flush_remaining=True`
4. `_try_build_ready_bundle(flush_remaining=True)` checks
`_total_pending_rows > 0` → False, so no flush happens
5. Empty bundles remain in `_pending_bundles` forever (memory leak)

## Reproduction
```python
import ray
ray.init()
ds = ray.data.range(5).filter(lambda row: row['id'] > 100)
ds = ds.repartition(target_num_rows_per_block=8)
ds.count()
```

## Solution
Changed flush condition in `_try_build_ready_bundle()` from checking
`_total_pending_rows > 0` to `len(self._pending_bundles) > 0`:

```python
# Before:
if flush_remaining and self._total_pending_rows > 0:

# After:
if flush_remaining and len(self._pending_bundles) > 0:
```

This ensures empty bundles never enter the bundler state, preventing
both hangs and memory leaks.

Signed-off-by: dragongu <andrewgu@vip.qq.com>
Signed-off-by: peterxcli <peterxcli@gmail.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

community-contribution Contributed by the community data Ray Data-related issues go add ONLY when ready to merge, run all tests

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants