Tried using Ray Data with Arrow nightly. It doesn't work.
2023-11-14 15:04:44,917 INFO worker.py:1686 -- Started a local Ray instance.
/Users/balaji/Documents/GitHub/ray/python/ray/air/util/tensor_extensions/arrow.py:76: FutureWarning: pyarrow.PyExtensionType is deprecated and will refuse deserialization by default. Instead, please derive from pyarrow.ExtensionType and implement your own serialization mechanism.
super().__init__(pa.list_(dtype))
/Users/balaji/anaconda3/envs/ray-nightly/lib/python3.11/copy.py:92: RuntimeWarning: pickle-based deserialization of pyarrow.PyExtensionType subclasses is disabled by default; if you only ingest trusted data files, you may re-enable this using `pyarrow.PyExtensionType.set_auto_load(True)`.
In the future, Python-defined extension subclasses should derive from pyarrow.ExtensionType (not pyarrow.PyExtensionType) and implement their own serialization mechanism.
rv = reductor(4)
/Users/balaji/anaconda3/envs/ray-nightly/lib/python3.11/copy.py:92: FutureWarning: pyarrow.PyExtensionType is deprecated and will refuse deserialization by default. Instead, please derive from pyarrow.ExtensionType and implement your own serialization mechanism.
rv = reductor(4)
2023-11-14 15:04:46,895 INFO plan.py:757 -- Using autodetected parallelism=20 for stage ReadRange to satisfy parallelism at least twice the available number of CPUs (10).
2023-11-14 15:04:46,895 INFO plan.py:762 -- To satisfy the requested parallelism of 20, each read task output is split into 20 smaller blocks.
/Users/balaji/Documents/GitHub/ray/python/ray/cloudpickle/cloudpickle_fast.py:733: RuntimeWarning: pickle-based deserialization of pyarrow.PyExtensionType subclasses is disabled by default; if you only ingest trusted data files, you may re-enable this using `pyarrow.PyExtensionType.set_auto_load(True)`.
In the future, Python-defined extension subclasses should derive from pyarrow.ExtensionType (not pyarrow.PyExtensionType) and implement their own serialization mechanism.
return Pickler.dump(self, obj)
/Users/balaji/Documents/GitHub/ray/python/ray/cloudpickle/cloudpickle_fast.py:733: FutureWarning: pyarrow.PyExtensionType is deprecated and will refuse deserialization by default. Instead, please derive from pyarrow.ExtensionType and implement your own serialization mechanism.
return Pickler.dump(self, obj)
Traceback (most recent call last):
File "/Users/balaji/Documents/GitHub/ray/repro.py", line 10, in <module>
ray.data.range_tensor(1).materialize()
File "/Users/balaji/Documents/GitHub/ray/python/ray/data/dataset.py", line 4582, in materialize
copy._plan.execute(force_read=True)
File "/Users/balaji/Documents/GitHub/ray/python/ray/data/_internal/plan.py", line 674, in execute
self._snapshot_blocks = self._snapshot_blocks.compute_to_blocklist()
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/Users/balaji/Documents/GitHub/ray/python/ray/data/_internal/lazy_block_list.py", line 339, in compute_to_blocklist
blocks, metadata = self._get_blocks_with_metadata()
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/Users/balaji/Documents/GitHub/ray/python/ray/data/_internal/lazy_block_list.py", line 314, in _get_blocks_with_metadata
for block_ref, meta_ref in self._iter_block_partition_refs():
File "/Users/balaji/Documents/GitHub/ray/python/ray/data/_internal/lazy_block_list.py", line 514, in __next__
return outer._get_or_compute(self._pos)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/Users/balaji/Documents/GitHub/ray/python/ray/data/_internal/lazy_block_list.py", line 534, in _get_or_compute
) = self._submit_task(j)
^^^^^^^^^^^^^^^^^^^^
File "/Users/balaji/Documents/GitHub/ray/python/ray/data/_internal/lazy_block_list.py", line 564, in _submit_task
.remote(
^^^^^^^
File "/Users/balaji/Documents/GitHub/ray/python/ray/remote_function.py", line 245, in remote
return func_cls._remote(args=args, kwargs=kwargs, **updated_options)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/Users/balaji/Documents/GitHub/ray/python/ray/_private/auto_init_hook.py", line 24, in auto_init_wrapper
return fn(*args, **kwargs)
^^^^^^^^^^^^^^^^^^^
File "/Users/balaji/Documents/GitHub/ray/python/ray/util/tracing/tracing_helper.py", line 310, in _invocation_remote_span
return method(self, args, kwargs, *_args, **_kwargs)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/Users/balaji/Documents/GitHub/ray/python/ray/remote_function.py", line 452, in _remote
return invocation(args, kwargs)
^^^^^^^^^^^^^^^^^^^^^^^^
File "/Users/balaji/Documents/GitHub/ray/python/ray/remote_function.py", line 420, in invocation
object_refs = worker.core_worker.submit_task(
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "python/ray/_raylet.pyx", line 3623, in ray._raylet.CoreWorker.submit_task
with self.profile_event(b"submit_task"):
File "python/ray/_raylet.pyx", line 3627, in ray._raylet.CoreWorker.submit_task
prepare_args_and_increment_put_refs(
File "python/ray/_raylet.pyx", line 743, in ray._raylet.prepare_args_and_increment_put_refs
raise e
File "python/ray/_raylet.pyx", line 734, in ray._raylet.prepare_args_and_increment_put_refs
prepare_args_internal(core_worker, language, args, args_vector,
File "python/ray/_raylet.pyx", line 781, in ray._raylet.prepare_args_internal
).serialize(arg)
File "/Users/balaji/Documents/GitHub/ray/python/ray/_private/serialization.py", line 494, in serialize
return self._serialize_to_msgpack(value)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/Users/balaji/Documents/GitHub/ray/python/ray/_private/serialization.py", line 472, in _serialize_to_msgpack
pickle5_serialized_object = self._serialize_to_pickle5(
^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/Users/balaji/Documents/GitHub/ray/python/ray/_private/serialization.py", line 425, in _serialize_to_pickle5
raise e
File "/Users/balaji/Documents/GitHub/ray/python/ray/_private/serialization.py", line 420, in _serialize_to_pickle5
inband = pickle.dumps(
^^^^^^^^^^^^^
File "/Users/balaji/Documents/GitHub/ray/python/ray/cloudpickle/cloudpickle_fast.py", line 88, in dumps
cp.dump(obj)
File "/Users/balaji/Documents/GitHub/ray/python/ray/cloudpickle/cloudpickle_fast.py", line 733, in dump
return Pickler.dump(self, obj)
^^^^^^^^^^^^^^^^^^^^^^^
File "pyarrow/types.pxi", line 1710, in pyarrow.lib.PyExtensionType.__reduce__
NotImplementedError: Please implement UnknownExtensionType.__reduce__
High: It blocks me from completing my task.
What happened + What you expected to happen
Tried using Ray Data with Arrow nightly. It doesn't work.
Versions / Dependencies
Arrow: 14.0.1
Ray: 8ecf2f5
Reproduction script
Issue Severity
High: It blocks me from completing my task.