[Data] Fix checkpoint filter PyArrow zero-copy conversion error#59839
Merged
bveeramani merged 1 commit intoray-project:masterfrom Jan 7, 2026
Merged
[Data] Fix checkpoint filter PyArrow zero-copy conversion error#59839bveeramani merged 1 commit intoray-project:masterfrom
bveeramani merged 1 commit intoray-project:masterfrom
Conversation
Contributor
There was a problem hiding this comment.
Code Review
This pull request correctly addresses the ArrowInvalid error that occurs during checkpoint filtering when a zero-copy conversion from a PyArrow array to a NumPy array is not possible. Changing zero_copy_only=True to False is the right solution, allowing PyArrow to perform a data copy when necessary. This ensures robustness without introducing performance regressions in cases where a zero-copy conversion is possible. I have one minor suggestion to improve code consistency.
Signed-off-by: dragongu <andrewgu@vip.qq.com>
092ecde to
5503b85
Compare
owenowenisme
reviewed
Jan 5, 2026
Member
owenowenisme
left a comment
There was a problem hiding this comment.
@dragongu Do you have a minimal repro script for this?
Thanks
Contributor
Author
@owenowenisme Reproduction scripts have been added. |
AYou0207
pushed a commit
to AYou0207/ray
that referenced
this pull request
Jan 13, 2026
…project#59839) # Fix `ArrowInvalid` error in checkpoint filter when converting PyArrow chunks to NumPy arrays ## Issue Fixes `ArrowInvalid` error when checkpoint filtering converts PyArrow chunks to NumPy arrays with `zero_copy_only=True`: ``` File "/usr/local/lib/python3.10/dist-packages/ray/data/checkpoint/checkpoint_filter.py", line 249, in filter_rows_for_block masks = list(executor.map(filter_with_ckpt_chunk, ckpt_chunks)) File "/usr/lib/python3.10/concurrent/futures/_base.py", line 621, in result_iterator yield _result_or_cancel(fs.pop()) File "/usr/lib/python3.10/concurrent/futures/_base.py", line 319, in _result_or_cancel return fut.result(timeout) File "/usr/lib/python3.10/concurrent/futures/_base.py", line 458, in result return self.__get_result() File "/usr/lib/python3.10/concurrent/futures/_base.py", line 403, in __get_result raise self._exception File "/usr/lib/python3.10/concurrent/futures/thread.py", line 58, in run result = self.fn(*self.args, **self.kwargs) File "/usr/local/lib/python3.10/dist-packages/ray/data/checkpoint/checkpoint_filter.py", line 229, in filter_with_ckpt_chunk ckpt_ids = ckpt_chunk.to_numpy(zero_copy_only=True) File "pyarrow/array.pxi", line 1789, in pyarrow.lib.Array.to_numpy File "pyarrow/error.pxi", line 92, in pyarrow.lib.check_status pyarrow.lib.ArrowInvalid: Needed to copy 1 chunks with 0 nulls, but zero_copy_only was True ``` This error occurs when checkpoint data is loaded from Ray's object store, where PyArrow buffers may reside in shared memory and cannot be zero-copied to NumPy. ## Reproduction ```python #!/usr/bin/env python3 import ray from ray.data import DataContext from ray.data.checkpoint import CheckpointConfig import tempfile ray.init() with tempfile.TemporaryDirectory() as ckpt_dir, \ tempfile.TemporaryDirectory() as data_dir, \ tempfile.TemporaryDirectory() as output_dir: # Step 1: Create data ray.data.range(10).map(lambda x: {"id": f"id_{x['id']}"}).write_parquet(data_dir) # Step 2: Enable checkpoint and write ctx = DataContext.get_current() ctx.checkpoint_config = CheckpointConfig( checkpoint_path=ckpt_dir, id_column="id", delete_checkpoint_on_success=False ) ray.data.read_parquet(data_dir).filter(lambda x: x["id"] != 'id_0').write_parquet(output_dir) # Step 3: Second write triggers checkpoint filtering ray.data.read_parquet(data_dir).write_parquet(output_dir) ray.shutdown() ``` ## Solution Change `to_numpy(zero_copy_only=True)` to `to_numpy(zero_copy_only=False)` in `BatchBasedCheckpointFilter.filter_rows_for_block()`. This allows PyArrow to copy data when necessary. ### Changes **File**: `ray/python/ray/data/checkpoint/checkpoint_filter.py` - Line 229: Changed `ckpt_chunk.to_numpy(zero_copy_only=True)` to `ckpt_chunk.to_numpy(zero_copy_only=False)` ### Performance Impact No performance regression expected. PyArrow will only perform a copy when zero-copy is not possible. Signed-off-by: dragongu <andrewgu@vip.qq.com> Signed-off-by: jasonwrwang <jasonwrwang@tencent.com>
lee1258561
pushed a commit
to pinterest/ray
that referenced
this pull request
Feb 3, 2026
…project#59839) # Fix `ArrowInvalid` error in checkpoint filter when converting PyArrow chunks to NumPy arrays ## Issue Fixes `ArrowInvalid` error when checkpoint filtering converts PyArrow chunks to NumPy arrays with `zero_copy_only=True`: ``` File "/usr/local/lib/python3.10/dist-packages/ray/data/checkpoint/checkpoint_filter.py", line 249, in filter_rows_for_block masks = list(executor.map(filter_with_ckpt_chunk, ckpt_chunks)) File "/usr/lib/python3.10/concurrent/futures/_base.py", line 621, in result_iterator yield _result_or_cancel(fs.pop()) File "/usr/lib/python3.10/concurrent/futures/_base.py", line 319, in _result_or_cancel return fut.result(timeout) File "/usr/lib/python3.10/concurrent/futures/_base.py", line 458, in result return self.__get_result() File "/usr/lib/python3.10/concurrent/futures/_base.py", line 403, in __get_result raise self._exception File "/usr/lib/python3.10/concurrent/futures/thread.py", line 58, in run result = self.fn(*self.args, **self.kwargs) File "/usr/local/lib/python3.10/dist-packages/ray/data/checkpoint/checkpoint_filter.py", line 229, in filter_with_ckpt_chunk ckpt_ids = ckpt_chunk.to_numpy(zero_copy_only=True) File "pyarrow/array.pxi", line 1789, in pyarrow.lib.Array.to_numpy File "pyarrow/error.pxi", line 92, in pyarrow.lib.check_status pyarrow.lib.ArrowInvalid: Needed to copy 1 chunks with 0 nulls, but zero_copy_only was True ``` This error occurs when checkpoint data is loaded from Ray's object store, where PyArrow buffers may reside in shared memory and cannot be zero-copied to NumPy. ## Reproduction ```python #!/usr/bin/env python3 import ray from ray.data import DataContext from ray.data.checkpoint import CheckpointConfig import tempfile ray.init() with tempfile.TemporaryDirectory() as ckpt_dir, \ tempfile.TemporaryDirectory() as data_dir, \ tempfile.TemporaryDirectory() as output_dir: # Step 1: Create data ray.data.range(10).map(lambda x: {"id": f"id_{x['id']}"}).write_parquet(data_dir) # Step 2: Enable checkpoint and write ctx = DataContext.get_current() ctx.checkpoint_config = CheckpointConfig( checkpoint_path=ckpt_dir, id_column="id", delete_checkpoint_on_success=False ) ray.data.read_parquet(data_dir).filter(lambda x: x["id"] != 'id_0').write_parquet(output_dir) # Step 3: Second write triggers checkpoint filtering ray.data.read_parquet(data_dir).write_parquet(output_dir) ray.shutdown() ``` ## Solution Change `to_numpy(zero_copy_only=True)` to `to_numpy(zero_copy_only=False)` in `BatchBasedCheckpointFilter.filter_rows_for_block()`. This allows PyArrow to copy data when necessary. ### Changes **File**: `ray/python/ray/data/checkpoint/checkpoint_filter.py` - Line 229: Changed `ckpt_chunk.to_numpy(zero_copy_only=True)` to `ckpt_chunk.to_numpy(zero_copy_only=False)` ### Performance Impact No performance regression expected. PyArrow will only perform a copy when zero-copy is not possible. Signed-off-by: dragongu <andrewgu@vip.qq.com>
ryanaoleary
pushed a commit
to ryanaoleary/ray
that referenced
this pull request
Feb 3, 2026
…project#59839) # Fix `ArrowInvalid` error in checkpoint filter when converting PyArrow chunks to NumPy arrays ## Issue Fixes `ArrowInvalid` error when checkpoint filtering converts PyArrow chunks to NumPy arrays with `zero_copy_only=True`: ``` File "/usr/local/lib/python3.10/dist-packages/ray/data/checkpoint/checkpoint_filter.py", line 249, in filter_rows_for_block masks = list(executor.map(filter_with_ckpt_chunk, ckpt_chunks)) File "/usr/lib/python3.10/concurrent/futures/_base.py", line 621, in result_iterator yield _result_or_cancel(fs.pop()) File "/usr/lib/python3.10/concurrent/futures/_base.py", line 319, in _result_or_cancel return fut.result(timeout) File "/usr/lib/python3.10/concurrent/futures/_base.py", line 458, in result return self.__get_result() File "/usr/lib/python3.10/concurrent/futures/_base.py", line 403, in __get_result raise self._exception File "/usr/lib/python3.10/concurrent/futures/thread.py", line 58, in run result = self.fn(*self.args, **self.kwargs) File "/usr/local/lib/python3.10/dist-packages/ray/data/checkpoint/checkpoint_filter.py", line 229, in filter_with_ckpt_chunk ckpt_ids = ckpt_chunk.to_numpy(zero_copy_only=True) File "pyarrow/array.pxi", line 1789, in pyarrow.lib.Array.to_numpy File "pyarrow/error.pxi", line 92, in pyarrow.lib.check_status pyarrow.lib.ArrowInvalid: Needed to copy 1 chunks with 0 nulls, but zero_copy_only was True ``` This error occurs when checkpoint data is loaded from Ray's object store, where PyArrow buffers may reside in shared memory and cannot be zero-copied to NumPy. ## Reproduction ```python #!/usr/bin/env python3 import ray from ray.data import DataContext from ray.data.checkpoint import CheckpointConfig import tempfile ray.init() with tempfile.TemporaryDirectory() as ckpt_dir, \ tempfile.TemporaryDirectory() as data_dir, \ tempfile.TemporaryDirectory() as output_dir: # Step 1: Create data ray.data.range(10).map(lambda x: {"id": f"id_{x['id']}"}).write_parquet(data_dir) # Step 2: Enable checkpoint and write ctx = DataContext.get_current() ctx.checkpoint_config = CheckpointConfig( checkpoint_path=ckpt_dir, id_column="id", delete_checkpoint_on_success=False ) ray.data.read_parquet(data_dir).filter(lambda x: x["id"] != 'id_0').write_parquet(output_dir) # Step 3: Second write triggers checkpoint filtering ray.data.read_parquet(data_dir).write_parquet(output_dir) ray.shutdown() ``` ## Solution Change `to_numpy(zero_copy_only=True)` to `to_numpy(zero_copy_only=False)` in `BatchBasedCheckpointFilter.filter_rows_for_block()`. This allows PyArrow to copy data when necessary. ### Changes **File**: `ray/python/ray/data/checkpoint/checkpoint_filter.py` - Line 229: Changed `ckpt_chunk.to_numpy(zero_copy_only=True)` to `ckpt_chunk.to_numpy(zero_copy_only=False)` ### Performance Impact No performance regression expected. PyArrow will only perform a copy when zero-copy is not possible. Signed-off-by: dragongu <andrewgu@vip.qq.com>
peterxcli
pushed a commit
to peterxcli/ray
that referenced
this pull request
Feb 25, 2026
…project#59839) # Fix `ArrowInvalid` error in checkpoint filter when converting PyArrow chunks to NumPy arrays ## Issue Fixes `ArrowInvalid` error when checkpoint filtering converts PyArrow chunks to NumPy arrays with `zero_copy_only=True`: ``` File "/usr/local/lib/python3.10/dist-packages/ray/data/checkpoint/checkpoint_filter.py", line 249, in filter_rows_for_block masks = list(executor.map(filter_with_ckpt_chunk, ckpt_chunks)) File "/usr/lib/python3.10/concurrent/futures/_base.py", line 621, in result_iterator yield _result_or_cancel(fs.pop()) File "/usr/lib/python3.10/concurrent/futures/_base.py", line 319, in _result_or_cancel return fut.result(timeout) File "/usr/lib/python3.10/concurrent/futures/_base.py", line 458, in result return self.__get_result() File "/usr/lib/python3.10/concurrent/futures/_base.py", line 403, in __get_result raise self._exception File "/usr/lib/python3.10/concurrent/futures/thread.py", line 58, in run result = self.fn(*self.args, **self.kwargs) File "/usr/local/lib/python3.10/dist-packages/ray/data/checkpoint/checkpoint_filter.py", line 229, in filter_with_ckpt_chunk ckpt_ids = ckpt_chunk.to_numpy(zero_copy_only=True) File "pyarrow/array.pxi", line 1789, in pyarrow.lib.Array.to_numpy File "pyarrow/error.pxi", line 92, in pyarrow.lib.check_status pyarrow.lib.ArrowInvalid: Needed to copy 1 chunks with 0 nulls, but zero_copy_only was True ``` This error occurs when checkpoint data is loaded from Ray's object store, where PyArrow buffers may reside in shared memory and cannot be zero-copied to NumPy. ## Reproduction ```python #!/usr/bin/env python3 import ray from ray.data import DataContext from ray.data.checkpoint import CheckpointConfig import tempfile ray.init() with tempfile.TemporaryDirectory() as ckpt_dir, \ tempfile.TemporaryDirectory() as data_dir, \ tempfile.TemporaryDirectory() as output_dir: # Step 1: Create data ray.data.range(10).map(lambda x: {"id": f"id_{x['id']}"}).write_parquet(data_dir) # Step 2: Enable checkpoint and write ctx = DataContext.get_current() ctx.checkpoint_config = CheckpointConfig( checkpoint_path=ckpt_dir, id_column="id", delete_checkpoint_on_success=False ) ray.data.read_parquet(data_dir).filter(lambda x: x["id"] != 'id_0').write_parquet(output_dir) # Step 3: Second write triggers checkpoint filtering ray.data.read_parquet(data_dir).write_parquet(output_dir) ray.shutdown() ``` ## Solution Change `to_numpy(zero_copy_only=True)` to `to_numpy(zero_copy_only=False)` in `BatchBasedCheckpointFilter.filter_rows_for_block()`. This allows PyArrow to copy data when necessary. ### Changes **File**: `ray/python/ray/data/checkpoint/checkpoint_filter.py` - Line 229: Changed `ckpt_chunk.to_numpy(zero_copy_only=True)` to `ckpt_chunk.to_numpy(zero_copy_only=False)` ### Performance Impact No performance regression expected. PyArrow will only perform a copy when zero-copy is not possible. Signed-off-by: dragongu <andrewgu@vip.qq.com> Signed-off-by: peterxcli <peterxcli@gmail.com>
peterxcli
pushed a commit
to peterxcli/ray
that referenced
this pull request
Feb 25, 2026
…project#59839) # Fix `ArrowInvalid` error in checkpoint filter when converting PyArrow chunks to NumPy arrays ## Issue Fixes `ArrowInvalid` error when checkpoint filtering converts PyArrow chunks to NumPy arrays with `zero_copy_only=True`: ``` File "/usr/local/lib/python3.10/dist-packages/ray/data/checkpoint/checkpoint_filter.py", line 249, in filter_rows_for_block masks = list(executor.map(filter_with_ckpt_chunk, ckpt_chunks)) File "/usr/lib/python3.10/concurrent/futures/_base.py", line 621, in result_iterator yield _result_or_cancel(fs.pop()) File "/usr/lib/python3.10/concurrent/futures/_base.py", line 319, in _result_or_cancel return fut.result(timeout) File "/usr/lib/python3.10/concurrent/futures/_base.py", line 458, in result return self.__get_result() File "/usr/lib/python3.10/concurrent/futures/_base.py", line 403, in __get_result raise self._exception File "/usr/lib/python3.10/concurrent/futures/thread.py", line 58, in run result = self.fn(*self.args, **self.kwargs) File "/usr/local/lib/python3.10/dist-packages/ray/data/checkpoint/checkpoint_filter.py", line 229, in filter_with_ckpt_chunk ckpt_ids = ckpt_chunk.to_numpy(zero_copy_only=True) File "pyarrow/array.pxi", line 1789, in pyarrow.lib.Array.to_numpy File "pyarrow/error.pxi", line 92, in pyarrow.lib.check_status pyarrow.lib.ArrowInvalid: Needed to copy 1 chunks with 0 nulls, but zero_copy_only was True ``` This error occurs when checkpoint data is loaded from Ray's object store, where PyArrow buffers may reside in shared memory and cannot be zero-copied to NumPy. ## Reproduction ```python #!/usr/bin/env python3 import ray from ray.data import DataContext from ray.data.checkpoint import CheckpointConfig import tempfile ray.init() with tempfile.TemporaryDirectory() as ckpt_dir, \ tempfile.TemporaryDirectory() as data_dir, \ tempfile.TemporaryDirectory() as output_dir: # Step 1: Create data ray.data.range(10).map(lambda x: {"id": f"id_{x['id']}"}).write_parquet(data_dir) # Step 2: Enable checkpoint and write ctx = DataContext.get_current() ctx.checkpoint_config = CheckpointConfig( checkpoint_path=ckpt_dir, id_column="id", delete_checkpoint_on_success=False ) ray.data.read_parquet(data_dir).filter(lambda x: x["id"] != 'id_0').write_parquet(output_dir) # Step 3: Second write triggers checkpoint filtering ray.data.read_parquet(data_dir).write_parquet(output_dir) ray.shutdown() ``` ## Solution Change `to_numpy(zero_copy_only=True)` to `to_numpy(zero_copy_only=False)` in `BatchBasedCheckpointFilter.filter_rows_for_block()`. This allows PyArrow to copy data when necessary. ### Changes **File**: `ray/python/ray/data/checkpoint/checkpoint_filter.py` - Line 229: Changed `ckpt_chunk.to_numpy(zero_copy_only=True)` to `ckpt_chunk.to_numpy(zero_copy_only=False)` ### Performance Impact No performance regression expected. PyArrow will only perform a copy when zero-copy is not possible. Signed-off-by: dragongu <andrewgu@vip.qq.com> Signed-off-by: peterxcli <peterxcli@gmail.com>
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Fix
ArrowInvaliderror in checkpoint filter when converting PyArrow chunks to NumPy arraysIssue
Fixes
ArrowInvaliderror when checkpoint filtering converts PyArrow chunks to NumPy arrays withzero_copy_only=True:This error occurs when checkpoint data is loaded from Ray's object store, where PyArrow buffers may reside in shared memory and cannot be zero-copied to NumPy.
Reproduction
Solution
Change
to_numpy(zero_copy_only=True)toto_numpy(zero_copy_only=False)inBatchBasedCheckpointFilter.filter_rows_for_block(). This allows PyArrow to copy data when necessary.Changes
File:
ray/python/ray/data/checkpoint/checkpoint_filter.pyckpt_chunk.to_numpy(zero_copy_only=True)tockpt_chunk.to_numpy(zero_copy_only=False)Performance Impact
No performance regression expected. PyArrow will only perform a copy when zero-copy is not possible.