Skip to content

[Data] Fix checkpoint filter PyArrow zero-copy conversion error#59839

Merged
bveeramani merged 1 commit intoray-project:masterfrom
dragongu:fix/checkpoint
Jan 7, 2026
Merged

[Data] Fix checkpoint filter PyArrow zero-copy conversion error#59839
bveeramani merged 1 commit intoray-project:masterfrom
dragongu:fix/checkpoint

Conversation

@dragongu
Copy link
Copy Markdown
Contributor

@dragongu dragongu commented Jan 5, 2026

Fix ArrowInvalid error in checkpoint filter when converting PyArrow chunks to NumPy arrays

Issue

Fixes ArrowInvalid error when checkpoint filtering converts PyArrow chunks to NumPy arrays with zero_copy_only=True:

  File "/usr/local/lib/python3.10/dist-packages/ray/data/checkpoint/checkpoint_filter.py", line 249, in filter_rows_for_block
    masks = list(executor.map(filter_with_ckpt_chunk, ckpt_chunks))
  File "/usr/lib/python3.10/concurrent/futures/_base.py", line 621, in result_iterator
    yield _result_or_cancel(fs.pop())
  File "/usr/lib/python3.10/concurrent/futures/_base.py", line 319, in _result_or_cancel
    return fut.result(timeout)
  File "/usr/lib/python3.10/concurrent/futures/_base.py", line 458, in result
    return self.__get_result()
  File "/usr/lib/python3.10/concurrent/futures/_base.py", line 403, in __get_result
    raise self._exception
  File "/usr/lib/python3.10/concurrent/futures/thread.py", line 58, in run
    result = self.fn(*self.args, **self.kwargs)
  File "/usr/local/lib/python3.10/dist-packages/ray/data/checkpoint/checkpoint_filter.py", line 229, in filter_with_ckpt_chunk
    ckpt_ids = ckpt_chunk.to_numpy(zero_copy_only=True)
  File "pyarrow/array.pxi", line 1789, in pyarrow.lib.Array.to_numpy
  File "pyarrow/error.pxi", line 92, in pyarrow.lib.check_status
pyarrow.lib.ArrowInvalid: Needed to copy 1 chunks with 0 nulls, but zero_copy_only was True

This error occurs when checkpoint data is loaded from Ray's object store, where PyArrow buffers may reside in shared memory and cannot be zero-copied to NumPy.

Reproduction

#!/usr/bin/env python3
import ray
from ray.data import DataContext
from ray.data.checkpoint import CheckpointConfig
import tempfile

ray.init()

with tempfile.TemporaryDirectory() as ckpt_dir, \
     tempfile.TemporaryDirectory() as data_dir, \
     tempfile.TemporaryDirectory() as output_dir:
    # Step 1: Create data
    ray.data.range(10).map(lambda x: {"id": f"id_{x['id']}"}).write_parquet(data_dir)

    # Step 2: Enable checkpoint and write
    ctx = DataContext.get_current()
    ctx.checkpoint_config = CheckpointConfig(
        checkpoint_path=ckpt_dir,
        id_column="id",
        delete_checkpoint_on_success=False
    )
    ray.data.read_parquet(data_dir).filter(lambda x: x["id"] != 'id_0').write_parquet(output_dir)

    # Step 3: Second write triggers checkpoint filtering
    ray.data.read_parquet(data_dir).write_parquet(output_dir)

ray.shutdown()

Solution

Change to_numpy(zero_copy_only=True) to to_numpy(zero_copy_only=False) in BatchBasedCheckpointFilter.filter_rows_for_block(). This allows PyArrow to copy data when necessary.

Changes

File: ray/python/ray/data/checkpoint/checkpoint_filter.py

  • Line 229: Changed ckpt_chunk.to_numpy(zero_copy_only=True) to ckpt_chunk.to_numpy(zero_copy_only=False)

Performance Impact

No performance regression expected. PyArrow will only perform a copy when zero-copy is not possible.

@dragongu dragongu requested a review from a team as a code owner January 5, 2026 02:22
Copy link
Copy Markdown
Contributor

@gemini-code-assist gemini-code-assist bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request correctly addresses the ArrowInvalid error that occurs during checkpoint filtering when a zero-copy conversion from a PyArrow array to a NumPy array is not possible. Changing zero_copy_only=True to False is the right solution, allowing PyArrow to perform a data copy when necessary. This ensures robustness without introducing performance regressions in cases where a zero-copy conversion is possible. I have one minor suggestion to improve code consistency.

Signed-off-by: dragongu <andrewgu@vip.qq.com>
@ray-gardener ray-gardener bot added data Ray Data-related issues community-contribution Contributed by the community labels Jan 5, 2026
Copy link
Copy Markdown
Member

@owenowenisme owenowenisme left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@dragongu Do you have a minimal repro script for this?

Thanks

@dragongu
Copy link
Copy Markdown
Contributor Author

dragongu commented Jan 6, 2026

@dragongu Do you have a minimal repro script for this?

Thanks

@owenowenisme Reproduction scripts have been added.

@owenowenisme owenowenisme added the go add ONLY when ready to merge, run all tests label Jan 6, 2026
Copy link
Copy Markdown
Member

@owenowenisme owenowenisme left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@bveeramani bveeramani merged commit c49b666 into ray-project:master Jan 7, 2026
7 checks passed
AYou0207 pushed a commit to AYou0207/ray that referenced this pull request Jan 13, 2026
…project#59839)

# Fix `ArrowInvalid` error in checkpoint filter when converting PyArrow
chunks to NumPy arrays

## Issue

Fixes `ArrowInvalid` error when checkpoint filtering converts PyArrow
chunks to NumPy arrays with `zero_copy_only=True`:

```
  File "/usr/local/lib/python3.10/dist-packages/ray/data/checkpoint/checkpoint_filter.py", line 249, in filter_rows_for_block
    masks = list(executor.map(filter_with_ckpt_chunk, ckpt_chunks))
  File "/usr/lib/python3.10/concurrent/futures/_base.py", line 621, in result_iterator
    yield _result_or_cancel(fs.pop())
  File "/usr/lib/python3.10/concurrent/futures/_base.py", line 319, in _result_or_cancel
    return fut.result(timeout)
  File "/usr/lib/python3.10/concurrent/futures/_base.py", line 458, in result
    return self.__get_result()
  File "/usr/lib/python3.10/concurrent/futures/_base.py", line 403, in __get_result
    raise self._exception
  File "/usr/lib/python3.10/concurrent/futures/thread.py", line 58, in run
    result = self.fn(*self.args, **self.kwargs)
  File "/usr/local/lib/python3.10/dist-packages/ray/data/checkpoint/checkpoint_filter.py", line 229, in filter_with_ckpt_chunk
    ckpt_ids = ckpt_chunk.to_numpy(zero_copy_only=True)
  File "pyarrow/array.pxi", line 1789, in pyarrow.lib.Array.to_numpy
  File "pyarrow/error.pxi", line 92, in pyarrow.lib.check_status
pyarrow.lib.ArrowInvalid: Needed to copy 1 chunks with 0 nulls, but zero_copy_only was True
```

This error occurs when checkpoint data is loaded from Ray's object
store, where PyArrow buffers may reside in shared memory and cannot be
zero-copied to NumPy.

## Reproduction

```python
#!/usr/bin/env python3
import ray
from ray.data import DataContext
from ray.data.checkpoint import CheckpointConfig
import tempfile

ray.init()

with tempfile.TemporaryDirectory() as ckpt_dir, \
     tempfile.TemporaryDirectory() as data_dir, \
     tempfile.TemporaryDirectory() as output_dir:
    # Step 1: Create data
    ray.data.range(10).map(lambda x: {"id": f"id_{x['id']}"}).write_parquet(data_dir)

    # Step 2: Enable checkpoint and write
    ctx = DataContext.get_current()
    ctx.checkpoint_config = CheckpointConfig(
        checkpoint_path=ckpt_dir,
        id_column="id",
        delete_checkpoint_on_success=False
    )
    ray.data.read_parquet(data_dir).filter(lambda x: x["id"] != 'id_0').write_parquet(output_dir)

    # Step 3: Second write triggers checkpoint filtering
    ray.data.read_parquet(data_dir).write_parquet(output_dir)

ray.shutdown()
```

## Solution

Change `to_numpy(zero_copy_only=True)` to
`to_numpy(zero_copy_only=False)` in
`BatchBasedCheckpointFilter.filter_rows_for_block()`. This allows
PyArrow to copy data when necessary.

### Changes

**File**: `ray/python/ray/data/checkpoint/checkpoint_filter.py`

- Line 229: Changed `ckpt_chunk.to_numpy(zero_copy_only=True)` to
`ckpt_chunk.to_numpy(zero_copy_only=False)`

### Performance Impact

No performance regression expected. PyArrow will only perform a copy
when zero-copy is not possible.

Signed-off-by: dragongu <andrewgu@vip.qq.com>
Signed-off-by: jasonwrwang <jasonwrwang@tencent.com>
lee1258561 pushed a commit to pinterest/ray that referenced this pull request Feb 3, 2026
…project#59839)

# Fix `ArrowInvalid` error in checkpoint filter when converting PyArrow
chunks to NumPy arrays

## Issue

Fixes `ArrowInvalid` error when checkpoint filtering converts PyArrow
chunks to NumPy arrays with `zero_copy_only=True`:

```
  File "/usr/local/lib/python3.10/dist-packages/ray/data/checkpoint/checkpoint_filter.py", line 249, in filter_rows_for_block
    masks = list(executor.map(filter_with_ckpt_chunk, ckpt_chunks))
  File "/usr/lib/python3.10/concurrent/futures/_base.py", line 621, in result_iterator
    yield _result_or_cancel(fs.pop())
  File "/usr/lib/python3.10/concurrent/futures/_base.py", line 319, in _result_or_cancel
    return fut.result(timeout)
  File "/usr/lib/python3.10/concurrent/futures/_base.py", line 458, in result
    return self.__get_result()
  File "/usr/lib/python3.10/concurrent/futures/_base.py", line 403, in __get_result
    raise self._exception
  File "/usr/lib/python3.10/concurrent/futures/thread.py", line 58, in run
    result = self.fn(*self.args, **self.kwargs)
  File "/usr/local/lib/python3.10/dist-packages/ray/data/checkpoint/checkpoint_filter.py", line 229, in filter_with_ckpt_chunk
    ckpt_ids = ckpt_chunk.to_numpy(zero_copy_only=True)
  File "pyarrow/array.pxi", line 1789, in pyarrow.lib.Array.to_numpy
  File "pyarrow/error.pxi", line 92, in pyarrow.lib.check_status
pyarrow.lib.ArrowInvalid: Needed to copy 1 chunks with 0 nulls, but zero_copy_only was True
```

This error occurs when checkpoint data is loaded from Ray's object
store, where PyArrow buffers may reside in shared memory and cannot be
zero-copied to NumPy.

## Reproduction

```python
#!/usr/bin/env python3
import ray
from ray.data import DataContext
from ray.data.checkpoint import CheckpointConfig
import tempfile

ray.init()

with tempfile.TemporaryDirectory() as ckpt_dir, \
     tempfile.TemporaryDirectory() as data_dir, \
     tempfile.TemporaryDirectory() as output_dir:
    # Step 1: Create data
    ray.data.range(10).map(lambda x: {"id": f"id_{x['id']}"}).write_parquet(data_dir)

    # Step 2: Enable checkpoint and write
    ctx = DataContext.get_current()
    ctx.checkpoint_config = CheckpointConfig(
        checkpoint_path=ckpt_dir,
        id_column="id",
        delete_checkpoint_on_success=False
    )
    ray.data.read_parquet(data_dir).filter(lambda x: x["id"] != 'id_0').write_parquet(output_dir)

    # Step 3: Second write triggers checkpoint filtering
    ray.data.read_parquet(data_dir).write_parquet(output_dir)

ray.shutdown()
```

## Solution

Change `to_numpy(zero_copy_only=True)` to
`to_numpy(zero_copy_only=False)` in
`BatchBasedCheckpointFilter.filter_rows_for_block()`. This allows
PyArrow to copy data when necessary.

### Changes

**File**: `ray/python/ray/data/checkpoint/checkpoint_filter.py`

- Line 229: Changed `ckpt_chunk.to_numpy(zero_copy_only=True)` to
`ckpt_chunk.to_numpy(zero_copy_only=False)`

### Performance Impact

No performance regression expected. PyArrow will only perform a copy
when zero-copy is not possible.

Signed-off-by: dragongu <andrewgu@vip.qq.com>
ryanaoleary pushed a commit to ryanaoleary/ray that referenced this pull request Feb 3, 2026
…project#59839)

# Fix `ArrowInvalid` error in checkpoint filter when converting PyArrow
chunks to NumPy arrays

## Issue

Fixes `ArrowInvalid` error when checkpoint filtering converts PyArrow
chunks to NumPy arrays with `zero_copy_only=True`:

```
  File "/usr/local/lib/python3.10/dist-packages/ray/data/checkpoint/checkpoint_filter.py", line 249, in filter_rows_for_block
    masks = list(executor.map(filter_with_ckpt_chunk, ckpt_chunks))
  File "/usr/lib/python3.10/concurrent/futures/_base.py", line 621, in result_iterator
    yield _result_or_cancel(fs.pop())
  File "/usr/lib/python3.10/concurrent/futures/_base.py", line 319, in _result_or_cancel
    return fut.result(timeout)
  File "/usr/lib/python3.10/concurrent/futures/_base.py", line 458, in result
    return self.__get_result()
  File "/usr/lib/python3.10/concurrent/futures/_base.py", line 403, in __get_result
    raise self._exception
  File "/usr/lib/python3.10/concurrent/futures/thread.py", line 58, in run
    result = self.fn(*self.args, **self.kwargs)
  File "/usr/local/lib/python3.10/dist-packages/ray/data/checkpoint/checkpoint_filter.py", line 229, in filter_with_ckpt_chunk
    ckpt_ids = ckpt_chunk.to_numpy(zero_copy_only=True)
  File "pyarrow/array.pxi", line 1789, in pyarrow.lib.Array.to_numpy
  File "pyarrow/error.pxi", line 92, in pyarrow.lib.check_status
pyarrow.lib.ArrowInvalid: Needed to copy 1 chunks with 0 nulls, but zero_copy_only was True
```

This error occurs when checkpoint data is loaded from Ray's object
store, where PyArrow buffers may reside in shared memory and cannot be
zero-copied to NumPy.

## Reproduction

```python
#!/usr/bin/env python3
import ray
from ray.data import DataContext
from ray.data.checkpoint import CheckpointConfig
import tempfile

ray.init()

with tempfile.TemporaryDirectory() as ckpt_dir, \
     tempfile.TemporaryDirectory() as data_dir, \
     tempfile.TemporaryDirectory() as output_dir:
    # Step 1: Create data
    ray.data.range(10).map(lambda x: {"id": f"id_{x['id']}"}).write_parquet(data_dir)

    # Step 2: Enable checkpoint and write
    ctx = DataContext.get_current()
    ctx.checkpoint_config = CheckpointConfig(
        checkpoint_path=ckpt_dir,
        id_column="id",
        delete_checkpoint_on_success=False
    )
    ray.data.read_parquet(data_dir).filter(lambda x: x["id"] != 'id_0').write_parquet(output_dir)

    # Step 3: Second write triggers checkpoint filtering
    ray.data.read_parquet(data_dir).write_parquet(output_dir)

ray.shutdown()
```

## Solution

Change `to_numpy(zero_copy_only=True)` to
`to_numpy(zero_copy_only=False)` in
`BatchBasedCheckpointFilter.filter_rows_for_block()`. This allows
PyArrow to copy data when necessary.

### Changes

**File**: `ray/python/ray/data/checkpoint/checkpoint_filter.py`

- Line 229: Changed `ckpt_chunk.to_numpy(zero_copy_only=True)` to
`ckpt_chunk.to_numpy(zero_copy_only=False)`

### Performance Impact

No performance regression expected. PyArrow will only perform a copy
when zero-copy is not possible.

Signed-off-by: dragongu <andrewgu@vip.qq.com>
peterxcli pushed a commit to peterxcli/ray that referenced this pull request Feb 25, 2026
…project#59839)

# Fix `ArrowInvalid` error in checkpoint filter when converting PyArrow
chunks to NumPy arrays

## Issue

Fixes `ArrowInvalid` error when checkpoint filtering converts PyArrow
chunks to NumPy arrays with `zero_copy_only=True`:

```
  File "/usr/local/lib/python3.10/dist-packages/ray/data/checkpoint/checkpoint_filter.py", line 249, in filter_rows_for_block
    masks = list(executor.map(filter_with_ckpt_chunk, ckpt_chunks))
  File "/usr/lib/python3.10/concurrent/futures/_base.py", line 621, in result_iterator
    yield _result_or_cancel(fs.pop())
  File "/usr/lib/python3.10/concurrent/futures/_base.py", line 319, in _result_or_cancel
    return fut.result(timeout)
  File "/usr/lib/python3.10/concurrent/futures/_base.py", line 458, in result
    return self.__get_result()
  File "/usr/lib/python3.10/concurrent/futures/_base.py", line 403, in __get_result
    raise self._exception
  File "/usr/lib/python3.10/concurrent/futures/thread.py", line 58, in run
    result = self.fn(*self.args, **self.kwargs)
  File "/usr/local/lib/python3.10/dist-packages/ray/data/checkpoint/checkpoint_filter.py", line 229, in filter_with_ckpt_chunk
    ckpt_ids = ckpt_chunk.to_numpy(zero_copy_only=True)
  File "pyarrow/array.pxi", line 1789, in pyarrow.lib.Array.to_numpy
  File "pyarrow/error.pxi", line 92, in pyarrow.lib.check_status
pyarrow.lib.ArrowInvalid: Needed to copy 1 chunks with 0 nulls, but zero_copy_only was True
```

This error occurs when checkpoint data is loaded from Ray's object
store, where PyArrow buffers may reside in shared memory and cannot be
zero-copied to NumPy.

## Reproduction

```python
#!/usr/bin/env python3
import ray
from ray.data import DataContext
from ray.data.checkpoint import CheckpointConfig
import tempfile

ray.init()

with tempfile.TemporaryDirectory() as ckpt_dir, \
     tempfile.TemporaryDirectory() as data_dir, \
     tempfile.TemporaryDirectory() as output_dir:
    # Step 1: Create data
    ray.data.range(10).map(lambda x: {"id": f"id_{x['id']}"}).write_parquet(data_dir)

    # Step 2: Enable checkpoint and write
    ctx = DataContext.get_current()
    ctx.checkpoint_config = CheckpointConfig(
        checkpoint_path=ckpt_dir,
        id_column="id",
        delete_checkpoint_on_success=False
    )
    ray.data.read_parquet(data_dir).filter(lambda x: x["id"] != 'id_0').write_parquet(output_dir)

    # Step 3: Second write triggers checkpoint filtering
    ray.data.read_parquet(data_dir).write_parquet(output_dir)

ray.shutdown()
```

## Solution

Change `to_numpy(zero_copy_only=True)` to
`to_numpy(zero_copy_only=False)` in
`BatchBasedCheckpointFilter.filter_rows_for_block()`. This allows
PyArrow to copy data when necessary.

### Changes

**File**: `ray/python/ray/data/checkpoint/checkpoint_filter.py`

- Line 229: Changed `ckpt_chunk.to_numpy(zero_copy_only=True)` to
`ckpt_chunk.to_numpy(zero_copy_only=False)`

### Performance Impact

No performance regression expected. PyArrow will only perform a copy
when zero-copy is not possible.

Signed-off-by: dragongu <andrewgu@vip.qq.com>
Signed-off-by: peterxcli <peterxcli@gmail.com>
peterxcli pushed a commit to peterxcli/ray that referenced this pull request Feb 25, 2026
…project#59839)

# Fix `ArrowInvalid` error in checkpoint filter when converting PyArrow
chunks to NumPy arrays

## Issue

Fixes `ArrowInvalid` error when checkpoint filtering converts PyArrow
chunks to NumPy arrays with `zero_copy_only=True`:

```
  File "/usr/local/lib/python3.10/dist-packages/ray/data/checkpoint/checkpoint_filter.py", line 249, in filter_rows_for_block
    masks = list(executor.map(filter_with_ckpt_chunk, ckpt_chunks))
  File "/usr/lib/python3.10/concurrent/futures/_base.py", line 621, in result_iterator
    yield _result_or_cancel(fs.pop())
  File "/usr/lib/python3.10/concurrent/futures/_base.py", line 319, in _result_or_cancel
    return fut.result(timeout)
  File "/usr/lib/python3.10/concurrent/futures/_base.py", line 458, in result
    return self.__get_result()
  File "/usr/lib/python3.10/concurrent/futures/_base.py", line 403, in __get_result
    raise self._exception
  File "/usr/lib/python3.10/concurrent/futures/thread.py", line 58, in run
    result = self.fn(*self.args, **self.kwargs)
  File "/usr/local/lib/python3.10/dist-packages/ray/data/checkpoint/checkpoint_filter.py", line 229, in filter_with_ckpt_chunk
    ckpt_ids = ckpt_chunk.to_numpy(zero_copy_only=True)
  File "pyarrow/array.pxi", line 1789, in pyarrow.lib.Array.to_numpy
  File "pyarrow/error.pxi", line 92, in pyarrow.lib.check_status
pyarrow.lib.ArrowInvalid: Needed to copy 1 chunks with 0 nulls, but zero_copy_only was True
```

This error occurs when checkpoint data is loaded from Ray's object
store, where PyArrow buffers may reside in shared memory and cannot be
zero-copied to NumPy.

## Reproduction

```python
#!/usr/bin/env python3
import ray
from ray.data import DataContext
from ray.data.checkpoint import CheckpointConfig
import tempfile

ray.init()

with tempfile.TemporaryDirectory() as ckpt_dir, \
     tempfile.TemporaryDirectory() as data_dir, \
     tempfile.TemporaryDirectory() as output_dir:
    # Step 1: Create data
    ray.data.range(10).map(lambda x: {"id": f"id_{x['id']}"}).write_parquet(data_dir)

    # Step 2: Enable checkpoint and write
    ctx = DataContext.get_current()
    ctx.checkpoint_config = CheckpointConfig(
        checkpoint_path=ckpt_dir,
        id_column="id",
        delete_checkpoint_on_success=False
    )
    ray.data.read_parquet(data_dir).filter(lambda x: x["id"] != 'id_0').write_parquet(output_dir)

    # Step 3: Second write triggers checkpoint filtering
    ray.data.read_parquet(data_dir).write_parquet(output_dir)

ray.shutdown()
```

## Solution

Change `to_numpy(zero_copy_only=True)` to
`to_numpy(zero_copy_only=False)` in
`BatchBasedCheckpointFilter.filter_rows_for_block()`. This allows
PyArrow to copy data when necessary.

### Changes

**File**: `ray/python/ray/data/checkpoint/checkpoint_filter.py`

- Line 229: Changed `ckpt_chunk.to_numpy(zero_copy_only=True)` to
`ckpt_chunk.to_numpy(zero_copy_only=False)`

### Performance Impact

No performance regression expected. PyArrow will only perform a copy
when zero-copy is not possible.

Signed-off-by: dragongu <andrewgu@vip.qq.com>
Signed-off-by: peterxcli <peterxcli@gmail.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

community-contribution Contributed by the community data Ray Data-related issues go add ONLY when ready to merge, run all tests

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants