Skip to content

[Data] Ray Data doesn't work with Arrow 14.0.1 #41139

@bveeramani

Description

@bveeramani

What happened + What you expected to happen

Tried using Ray Data with Arrow nightly. It doesn't work.

2023-11-14 15:04:44,917 INFO worker.py:1686 -- Started a local Ray instance.
/Users/balaji/Documents/GitHub/ray/python/ray/air/util/tensor_extensions/arrow.py:76: FutureWarning: pyarrow.PyExtensionType is deprecated and will refuse deserialization by default. Instead, please derive from pyarrow.ExtensionType and implement your own serialization mechanism.
  super().__init__(pa.list_(dtype))
/Users/balaji/anaconda3/envs/ray-nightly/lib/python3.11/copy.py:92: RuntimeWarning: pickle-based deserialization of pyarrow.PyExtensionType subclasses is disabled by default; if you only ingest trusted data files, you may re-enable this using `pyarrow.PyExtensionType.set_auto_load(True)`.
In the future, Python-defined extension subclasses should derive from pyarrow.ExtensionType (not pyarrow.PyExtensionType) and implement their own serialization mechanism.

  rv = reductor(4)
/Users/balaji/anaconda3/envs/ray-nightly/lib/python3.11/copy.py:92: FutureWarning: pyarrow.PyExtensionType is deprecated and will refuse deserialization by default. Instead, please derive from pyarrow.ExtensionType and implement your own serialization mechanism.
  rv = reductor(4)
2023-11-14 15:04:46,895 INFO plan.py:757 -- Using autodetected parallelism=20 for stage ReadRange to satisfy parallelism at least twice the available number of CPUs (10).
2023-11-14 15:04:46,895 INFO plan.py:762 -- To satisfy the requested parallelism of 20, each read task output is split into 20 smaller blocks.
/Users/balaji/Documents/GitHub/ray/python/ray/cloudpickle/cloudpickle_fast.py:733: RuntimeWarning: pickle-based deserialization of pyarrow.PyExtensionType subclasses is disabled by default; if you only ingest trusted data files, you may re-enable this using `pyarrow.PyExtensionType.set_auto_load(True)`.
In the future, Python-defined extension subclasses should derive from pyarrow.ExtensionType (not pyarrow.PyExtensionType) and implement their own serialization mechanism.

  return Pickler.dump(self, obj)
/Users/balaji/Documents/GitHub/ray/python/ray/cloudpickle/cloudpickle_fast.py:733: FutureWarning: pyarrow.PyExtensionType is deprecated and will refuse deserialization by default. Instead, please derive from pyarrow.ExtensionType and implement your own serialization mechanism.
  return Pickler.dump(self, obj)
Traceback (most recent call last):
  File "/Users/balaji/Documents/GitHub/ray/repro.py", line 10, in <module>
    ray.data.range_tensor(1).materialize()
  File "/Users/balaji/Documents/GitHub/ray/python/ray/data/dataset.py", line 4582, in materialize
    copy._plan.execute(force_read=True)
  File "/Users/balaji/Documents/GitHub/ray/python/ray/data/_internal/plan.py", line 674, in execute
    self._snapshot_blocks = self._snapshot_blocks.compute_to_blocklist()
                            ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/balaji/Documents/GitHub/ray/python/ray/data/_internal/lazy_block_list.py", line 339, in compute_to_blocklist
    blocks, metadata = self._get_blocks_with_metadata()
                       ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/balaji/Documents/GitHub/ray/python/ray/data/_internal/lazy_block_list.py", line 314, in _get_blocks_with_metadata
    for block_ref, meta_ref in self._iter_block_partition_refs():
  File "/Users/balaji/Documents/GitHub/ray/python/ray/data/_internal/lazy_block_list.py", line 514, in __next__
    return outer._get_or_compute(self._pos)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/balaji/Documents/GitHub/ray/python/ray/data/_internal/lazy_block_list.py", line 534, in _get_or_compute
    ) = self._submit_task(j)
        ^^^^^^^^^^^^^^^^^^^^
  File "/Users/balaji/Documents/GitHub/ray/python/ray/data/_internal/lazy_block_list.py", line 564, in _submit_task
    .remote(
     ^^^^^^^
  File "/Users/balaji/Documents/GitHub/ray/python/ray/remote_function.py", line 245, in remote
    return func_cls._remote(args=args, kwargs=kwargs, **updated_options)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/balaji/Documents/GitHub/ray/python/ray/_private/auto_init_hook.py", line 24, in auto_init_wrapper
    return fn(*args, **kwargs)
           ^^^^^^^^^^^^^^^^^^^
  File "/Users/balaji/Documents/GitHub/ray/python/ray/util/tracing/tracing_helper.py", line 310, in _invocation_remote_span
    return method(self, args, kwargs, *_args, **_kwargs)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/balaji/Documents/GitHub/ray/python/ray/remote_function.py", line 452, in _remote
    return invocation(args, kwargs)
           ^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/balaji/Documents/GitHub/ray/python/ray/remote_function.py", line 420, in invocation
    object_refs = worker.core_worker.submit_task(
                  ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "python/ray/_raylet.pyx", line 3623, in ray._raylet.CoreWorker.submit_task
    with self.profile_event(b"submit_task"):
  File "python/ray/_raylet.pyx", line 3627, in ray._raylet.CoreWorker.submit_task
    prepare_args_and_increment_put_refs(
  File "python/ray/_raylet.pyx", line 743, in ray._raylet.prepare_args_and_increment_put_refs
    raise e
  File "python/ray/_raylet.pyx", line 734, in ray._raylet.prepare_args_and_increment_put_refs
    prepare_args_internal(core_worker, language, args, args_vector,
  File "python/ray/_raylet.pyx", line 781, in ray._raylet.prepare_args_internal
    ).serialize(arg)
  File "/Users/balaji/Documents/GitHub/ray/python/ray/_private/serialization.py", line 494, in serialize
    return self._serialize_to_msgpack(value)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/balaji/Documents/GitHub/ray/python/ray/_private/serialization.py", line 472, in _serialize_to_msgpack
    pickle5_serialized_object = self._serialize_to_pickle5(
                                ^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/balaji/Documents/GitHub/ray/python/ray/_private/serialization.py", line 425, in _serialize_to_pickle5
    raise e
  File "/Users/balaji/Documents/GitHub/ray/python/ray/_private/serialization.py", line 420, in _serialize_to_pickle5
    inband = pickle.dumps(
             ^^^^^^^^^^^^^
  File "/Users/balaji/Documents/GitHub/ray/python/ray/cloudpickle/cloudpickle_fast.py", line 88, in dumps
    cp.dump(obj)
  File "/Users/balaji/Documents/GitHub/ray/python/ray/cloudpickle/cloudpickle_fast.py", line 733, in dump
    return Pickler.dump(self, obj)
           ^^^^^^^^^^^^^^^^^^^^^^^
  File "pyarrow/types.pxi", line 1710, in pyarrow.lib.PyExtensionType.__reduce__
NotImplementedError: Please implement UnknownExtensionType.__reduce__

Versions / Dependencies

Arrow: 14.0.1
Ray: 8ecf2f5

Reproduction script

import ray

ray.data.range_tensor(1).materialize()

Issue Severity

High: It blocks me from completing my task.

Metadata

Metadata

Assignees

No one assigned

    Labels

    P1Issue that should be fixed within a few weeksbugSomething that is supposed to be working; but isn'tdataRay Data-related issues

    Type

    No type
    No fields configured for issues without a type.

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions