Skip to content

[Data] Schema error while writing Parquet files #48102

@bveeramani

Description

@bveeramani

What happened + What you expected to happen

I'm writing Parquet files, with the number of rows per file configured. I'm getting the error below:

Traceback (most recent call last):
  File "/home/ray/default/1.py", line 56, in <module>
    dataset.repartition(64).select_columns(['RESOURCE_ID', 'LINK', 'STATUS', 'OPTED_OUT', 'TITLE', 'DESCRIPTION']).materialize().map(parse_record).write_parquet(
  File "ray/data/dataset.py", line 2780, in write_parquet
    self.write_datasink(
  File "ray/data/dataset.py", line 3614, in write_datasink
    self._write_ds = Dataset(plan, logical_plan).materialize()
  File "ray/data/dataset.py", line 4642, in materialize
    copy._plan.execute()
  File "ray/data/exceptions.py", line 89, in handle_trace
    raise e.with_traceback(None) from SystemException()
ray.exceptions.RayTaskError(ValueError): ray::Write() (pid=9458, ip=10.0.68.5)
    for b_out in map_transformer.apply_transform(iter(blocks), ctx):
  File "ray/data/_internal/execution/operators/map_transformer.py", line 253, in __call__
    yield from self._block_fn(input, ctx)
  File "ray/data/_internal/planner/plan_write_op.py", line 26, in fn
    write_result = datasink_or_legacy_datasource.write(blocks, ctx)
  File "ray/data/_internal/datasource/parquet_datasink.py", line 85, in write
    call_with_retry(
  File "ray/data/_internal/util.py", line 986, in call_with_retry
    raise e from None
  File "ray/data/_internal/util.py", line 973, in call_with_retry
    return f()
  File "ray/data/_internal/datasource/parquet_datasink.py", line 82, in write_blocks_to_path
    writer.write_table(table)
  File "pyarrow/parquet/core.py", line 1094, in write_table
    raise ValueError(msg)
ValueError: Table schema does not match schema used to create file: 
table:
RESOURCE_ID: string
LINK: string
STATUS: string
OPTED_OUT: string
TITLE: null
DESCRIPTION: null
image_bytes: binary vs. 
file:
RESOURCE_ID: string
LINK: string
STATUS: string
OPTED_OUT: string
TITLE: string
DESCRIPTION: null
image_bytes: binary

Versions / Dependencies

2.37

Reproduction script

import ray
from ray.data._internal.logical.optimizers import _PHYSICAL_RULES
from ray.data._internal.logical.rules.operator_fusion import OperatorFusionRule

_PHYSICAL_RULES.remove(OperatorFusionRule)


def add_row(row):
    if row["id"] == 0:
        return {"data": "eggs"}
    else:
        return {"data": None}


ray.data.range(2, override_num_blocks=2).map(add_row).write_parquet(
    "data", num_rows_per_file=2
)

Issue Severity

None

Metadata

Metadata

Assignees

Labels

P1Issue that should be fixed within a few weeksbugSomething that is supposed to be working; but isn'tdataRay Data-related issues

Type

No type

Projects

No projects

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions