Skip to content

[Data] rename_column doesn't work with Hive-partitioned columns #58714

@bveeramani

Description

@bveeramani

What happened + What you expected to happen

Traceback (most recent call last):
  File "/Users/balaji/ray/python/ray/data/exceptions.py", line 49, in handle_trace
    return fn(*args, **kwargs)
  File "/Users/balaji/ray/python/ray/data/_internal/plan.py", line 479, in execute_to_iterator
    bundle_iter = itertools.chain([next(gen)], gen)
  File "/Users/balaji/ray/python/ray/data/_internal/execution/interfaces/executor.py", line 34, in __next__
    return self.get_next()
  File "/Users/balaji/ray/python/ray/data/_internal/execution/legacy_compat.py", line 76, in get_next
    bundle = self._base_iterator.get_next(output_split_idx)
  File "/Users/balaji/ray/python/ray/data/_internal/execution/streaming_executor.py", line 786, in get_next
    bundle = state.get_output_blocking(output_split_idx)
  File "/Users/balaji/ray/python/ray/data/_internal/execution/streaming_executor_state.py", line 454, in get_output_blocking
    raise self._exception
  File "/Users/balaji/ray/python/ray/data/_internal/execution/streaming_executor.py", line 356, in run
    continue_sched = self._scheduling_loop_step(self._topology)
  File "/Users/balaji/ray/python/ray/data/_internal/execution/streaming_executor.py", line 466, in _scheduling_loop_step
    num_errored_blocks = process_completed_tasks(
  File "/Users/balaji/ray/python/ray/data/_internal/execution/streaming_executor_state.py", line 626, in process_completed_tasks
    raise e from None
  File "/Users/balaji/ray/python/ray/data/_internal/execution/streaming_executor_state.py", line 593, in process_completed_tasks
    bytes_read = task.on_data_ready(
  File "/Users/balaji/ray/python/ray/data/_internal/execution/interfaces/physical_operator.py", line 199, in on_data_ready
    raise ex from None
  File "/Users/balaji/ray/python/ray/data/_internal/execution/interfaces/physical_operator.py", line 194, in on_data_ready
    ray.get(self._pending_block_ref)
  File "/Users/balaji/ray/python/ray/_private/auto_init_hook.py", line 22, in auto_init_wrapper
    return fn(*args, **kwargs)
  File "/Users/balaji/ray/python/ray/_private/client_mode_hook.py", line 104, in wrapper
    return func(*args, **kwargs)
  File "/Users/balaji/ray/python/ray/_private/worker.py", line 2972, in get
    values, debugger_breakpoint = worker.get_objects(
  File "/Users/balaji/ray/python/ray/_private/worker.py", line 1031, in get_objects
    raise value.as_instanceof_cause()
ray.exceptions.RayTaskError(ArrowInvalid): ray::ReadParquet->SplitBlocks(5)() (pid=53285, ip=127.0.0.1)
  File "/Users/balaji/ray/python/ray/data/_internal/execution/operators/map_operator.py", line 654, in _map_task
    for b_out in map_transformer.apply_transform(block_iter, ctx):
  File "/Users/balaji/ray/python/ray/data/_internal/execution/operators/map_transformer.py", line 102, in __call__
    yield from self._post_process(results)
  File "/Users/balaji/ray/python/ray/data/_internal/execution/operators/map_transformer.py", line 412, in _apply_transform
    yield from self._block_fn(blocks, ctx)
  File "/Users/balaji/ray/python/ray/data/_internal/execution/operators/map_operator.py", line 938, in _split_blocks
    for block in blocks:
  File "/Users/balaji/ray/python/ray/data/_internal/execution/operators/map_transformer.py", line 102, in __call__
    yield from self._post_process(results)
  File "/Users/balaji/ray/python/ray/data/_internal/execution/operators/map_transformer.py", line 84, in _shape_blocks
    for result in results:
  File "/Users/balaji/ray/python/ray/data/_internal/execution/operators/map_transformer.py", line 412, in _apply_transform
    yield from self._block_fn(blocks, ctx)
  File "/Users/balaji/ray/python/ray/data/_internal/planner/plan_read_op.py", line 107, in do_read
    yield from read_task()
  File "/Users/balaji/ray/python/ray/data/datasource/datasource.py", line 416, in __call__
    yield from result
  File "/Users/balaji/ray/python/ray/data/_internal/datasource/parquet_datasource.py", line 479, in read_fragments
    for table in iterate_with_retry(
  File "/Users/balaji/ray/python/ray/data/_internal/util.py", line 1433, in iterate_with_retry
    raise e from None
  File "/Users/balaji/ray/python/ray/data/_internal/util.py", line 1414, in iterate_with_retry
    for item_index, item in enumerate(iterable):
  File "/Users/balaji/ray/python/ray/data/_internal/datasource/parquet_datasource.py", line 602, in _read_batches_from
    yield from _DatasourceProjectionPushdownMixin._apply_rename_to_tables(
  File "/Users/balaji/ray/python/ray/data/datasource/datasource.py", line 175, in _apply_rename_to_tables
    for table in tables:
  File "/Users/balaji/ray/python/ray/data/_internal/datasource/parquet_datasource.py", line 548, in _generate_tables
    for batch in fragment.to_batches(
  File "pyarrow/_dataset.pyx", line 1605, in pyarrow._dataset.Fragment.to_batches
  File "pyarrow/_dataset.pyx", line 3692, in pyarrow._dataset.Scanner.from_fragment
  File "pyarrow/_dataset.pyx", line 3478, in pyarrow._dataset._populate_builder
  File "pyarrow/error.pxi", line 92, in pyarrow.lib.check_status
pyarrow.lib.ArrowInvalid: No match for FieldRef.Name(country) in name: string
age: int64
__fragment_index: int32
__batch_index: int32
__last_in_fragment: bool
__filename: string
ray.data.exceptions.SystemException

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "/Users/balaji/ray/1.py", line 35, in <module>
    print(ray.data.read_parquet(output_dir).rename_columns({"country": "spam"}).take_all())
  File "/Users/balaji/ray/python/ray/data/dataset.py", line 3535, in take_all
    for row in self.iter_rows():
  File "/Users/balaji/ray/python/ray/data/iterator.py", line 246, in _wrapped_iterator
    for batch in batch_iterable:
  File "/Users/balaji/ray/python/ray/data/iterator.py", line 190, in _create_iterator
    ) = self._to_ref_bundle_iterator()
  File "/Users/balaji/ray/python/ray/data/_internal/iterator/iterator_impl.py", line 27, in _to_ref_bundle_iterator
    ref_bundles_iterator, stats = self._base_dataset._execute_to_iterator()
  File "/Users/balaji/ray/python/ray/data/dataset.py", line 6586, in _execute_to_iterator
    bundle_iter, stats, executor = self._plan.execute_to_iterator()
  File "/Users/balaji/ray/python/ray/data/exceptions.py", line 89, in handle_trace
    raise e.with_traceback(None) from SystemException()
ray.exceptions.RayTaskError(ArrowInvalid): ray::ReadParquet->SplitBlocks(5)() (pid=53285, ip=127.0.0.1)
  File "/Users/balaji/ray/python/ray/data/_internal/execution/operators/map_operator.py", line 654, in _map_task
    for b_out in map_transformer.apply_transform(block_iter, ctx):
  File "/Users/balaji/ray/python/ray/data/_internal/execution/operators/map_transformer.py", line 102, in __call__
    yield from self._post_process(results)
  File "/Users/balaji/ray/python/ray/data/_internal/execution/operators/map_transformer.py", line 412, in _apply_transform
    yield from self._block_fn(blocks, ctx)
  File "/Users/balaji/ray/python/ray/data/_internal/execution/operators/map_operator.py", line 938, in _split_blocks
    for block in blocks:
  File "/Users/balaji/ray/python/ray/data/_internal/execution/operators/map_transformer.py", line 102, in __call__
    yield from self._post_process(results)
  File "/Users/balaji/ray/python/ray/data/_internal/execution/operators/map_transformer.py", line 84, in _shape_blocks
    for result in results:
  File "/Users/balaji/ray/python/ray/data/_internal/execution/operators/map_transformer.py", line 412, in _apply_transform
    yield from self._block_fn(blocks, ctx)
  File "/Users/balaji/ray/python/ray/data/_internal/planner/plan_read_op.py", line 107, in do_read
    yield from read_task()
  File "/Users/balaji/ray/python/ray/data/datasource/datasource.py", line 416, in __call__
    yield from result
  File "/Users/balaji/ray/python/ray/data/_internal/datasource/parquet_datasource.py", line 479, in read_fragments
    for table in iterate_with_retry(
  File "/Users/balaji/ray/python/ray/data/_internal/util.py", line 1433, in iterate_with_retry
    raise e from None
  File "/Users/balaji/ray/python/ray/data/_internal/util.py", line 1414, in iterate_with_retry
    for item_index, item in enumerate(iterable):
  File "/Users/balaji/ray/python/ray/data/_internal/datasource/parquet_datasource.py", line 602, in _read_batches_from
    yield from _DatasourceProjectionPushdownMixin._apply_rename_to_tables(
  File "/Users/balaji/ray/python/ray/data/datasource/datasource.py", line 175, in _apply_rename_to_tables
    for table in tables:
  File "/Users/balaji/ray/python/ray/data/_internal/datasource/parquet_datasource.py", line 548, in _generate_tables
    for batch in fragment.to_batches(
  File "pyarrow/_dataset.pyx", line 1605, in pyarrow._dataset.Fragment.to_batches
  File "pyarrow/_dataset.pyx", line 3692, in pyarrow._dataset.Scanner.from_fragment
  File "pyarrow/_dataset.pyx", line 3478, in pyarrow._dataset._populate_builder
  File "pyarrow/error.pxi", line 92, in pyarrow.lib.check_status
pyarrow.lib.ArrowInvalid: No match for FieldRef.Name(country) in name: string
age: int64
__fragment_index: int32
__batch_index: int32
__last_in_fragment: bool
__filename: string

Versions / Dependencies

1c37a45

Reproduction script

import os

import pyarrow as pa
import pyarrow.parquet as pq

import ray

# Example data
data = [
    {"name": "Alice", "age": 25, "country": "US", "year": 2024},
    {"name": "Bob", "age": 30, "country": "US", "year": 2023},
    {"name": "Charlie", "age": 35, "country": "CA", "year": 2024},
    {"name": "David", "age": 40, "country": "CA", "year": 2023},
]

# Convert to PyArrow Table
table = pa.Table.from_pylist(data)

# Output directory
output_dir = "/tmp/hive_dataset"
os.makedirs(output_dir, exist_ok=True)

# Write dataset partitioned by 'country' and 'year' (Hive-style)
pq.write_to_dataset(
    table,
    output_dir,
    partition_cols=["country", "year"],  # Hive-style partitioning
    existing_data_behavior="overwrite_or_ignore",
)
print(f"Hive-partitioned dataset written to {output_dir}")


ray.data.read_parquet(output_dir).rename_columns({"country": "spam"}).take_all()

Issue Severity

Medium: It is a significant difficulty but I can work around it.

Metadata

Metadata

Assignees

No one assigned

    Labels

    P1Issue that should be fixed within a few weeksbugSomething that is supposed to be working; but isn'tdataRay Data-related issues

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions