Skip to content

[Data] dataset to_arrow_refs recomputing full pipeline, doubling compute time #56601

@dyami0123

Description

@dyami0123

What happened + What you expected to happen

When using dataset.to_arrow_refs() the entire compute pipeline is executed twice, once with limit(1) appended to the end, the second without. This causes the materialization process to take twice as long as an equivalent dataset.to_pandas() call.

example dataset calls in the ray dashboard

Image

Versions / Dependencies

ray Version: 2.48.0

Reproduction script

from __future__ import annotations

import os
import time
import pathlib
from dataclasses import dataclass

import numpy as np
import pandas as pd
import pyarrow as pa
import pyarrow.parquet as pq
import ray
import ray.data as rd


@dataclass(frozen=True)
class Paths:
    root: pathlib.Path
    a: str
    b: str


def make_parquet_pairs(rows: int = 10_000) -> Paths:
    """
    Create two Parquet files sharing an index 'id'.
    - a.parquet: columns [id, category, val_a]
    - b.parquet: columns [id, val_b] with some missing ids to exercise the join
    """
    root = pathlib.Path(__file__).parent / "tmp"
    root.mkdir(exist_ok=True)
    a_path = os.path.join(root, "a.parquet")
    b_path = os.path.join(root, "b.parquet")

    rng = np.random.default_rng(42)
    ids = np.arange(rows, dtype=np.int64)

    df_a = pd.DataFrame(
        {
            "id": ids,
            "category": rng.integers(0, 50, size=rows, dtype=np.int16),
            "val_a": rng.normal(loc=100.0, scale=15.0, size=rows).astype(np.float32),
        }
    )

    # Drop ~20% of ids in b to make the join meaningful
    mask = rng.random(rows) > 0.2
    ids_b = ids[mask]
    df_b = pd.DataFrame(
        {
            "id": ids_b,
            "val_b": rng.normal(loc=50.0, scale=7.0, size=len(ids_b)).astype(np.float32),
        }
    )

    # Write as Parquet
    pq.write_table(pa.Table.from_pandas(df_a, preserve_index=False), a_path)
    pq.write_table(pa.Table.from_pandas(df_b, preserve_index=False), b_path)

    if not pathlib.Path(b_path).exists():
        raise ValueError("Expected parquet files not found after write.")

    return Paths(root=root, a=a_path, b=b_path)


def run_with_materialize(paths: Paths) -> None:
    print("\n=== WITH materialize()-like behavior via explicit materializations ===")

    # --- Keep your change: read to Pandas, then rd.from_pandas() ---
    df_a = pd.read_parquet(paths.a)
    df_b = pd.read_parquet(paths.b)

    print(f"df_a shape: {df_a.shape}, df_b shape: {df_b.shape}")
    print("df_a head():")
    print(df_a.head())

    ds_a = rd.from_pandas(df_a)
    ds_b = rd.from_pandas(df_b)

    # Keep your join style (inner) and partition hint
    # (Using supported arg names to avoid API errors while preserving intent.)
    joined = ds_a.join(ds_b, on=("id",), join_type="inner", num_partitions=1)  # your script used inner
    # If you want to force a single partition (heavier tasks), uncomment:
    # joined = joined.repartition(1)

    result = joined.groupby("category").sum("val_b")

    # ---------------------------
    # TIMING: Arrow-refs materialization
    # ---------------------------
    print("\n[Timing] to_arrow_refs() submission + full materialization:")
    t0 = time.perf_counter()
    refs = result.to_arrow_refs()  # Triggers execution; returns list of ObjectRefs
    t1 = time.perf_counter()
    print(f"Submission time (to_arrow_refs call only): {t1 - t0:0.3f} s")

    # Wait for all refs to finish and fetch them; measure full time-to-ready
    pending = set(refs)
    materialized_tables: list[pa.Table] = []
    t2 = time.perf_counter()
    while pending:
        ready, pending = ray.wait(list(pending), num_returns=1)
        tbl: pa.Table = ray.get(ready[0])
        materialized_tables.append(tbl)
    t3 = time.perf_counter()
    print(f"Full materialization time (waiting on refs): {t3 - t2:0.3f} s")
    print(f"Total time (submission + materialization): {t3 - t0:0.3f} s")
    if materialized_tables:
        print("Example Arrow table schema from first ref:")
        print(materialized_tables[0].schema)

    # ---------------------------
    # TIMING: Pandas materialization
    # ---------------------------
    print("\n[Timing] to_pandas() full materialization:")
    t4 = time.perf_counter()
    pdf = result.to_pandas()
    t5 = time.perf_counter()
    print(f"to_pandas() time: {t5 - t4:0.3f} s")
    print(f"Pandas result shape: {pdf.shape}")
    print(pdf.head())


    print(f"Submission time (to_arrow_refs call only): {t1 - t0:0.3f} s")
    print(f"Full materialization time (waiting on refs): {t3 - t2:0.3f} s")
    print(f"Total time (submission + materialization): {t3 - t0:0.3f} s")


    print(f"to_pandas() time: {t5 - t4:0.3f} s")


def main() -> None:
    # Optional: more verbose progress in logs
    # os.environ.setdefault("RAY_DATASET_VERBOSE_PROGRESS", "1")

    # Start Ray locally; set num_cpus to taste
    ray.init(ignore_reinit_error=True)

    # Keep your row count change
    paths = make_parquet_pairs(rows=100_000_000)
    print(f"Temp data dir: {paths.root}")

    run_with_materialize(paths)

    


if __name__ == "__main__":
    main()


# Submission time (to_arrow_refs call only): 23.407 s
# Full materialization time (waiting on refs): 0.000 s
# Total time (submission + materialization): 23.407 s
# to_pandas() time: 10.907 s

Issue Severity

Medium: It is a significant difficulty but I can work around it.

Metadata

Metadata

Assignees

Labels

P1Issue that should be fixed within a few weeksbugSomething that is supposed to be working; but isn'tcommunity-backlogdataRay Data-related issuesperformance

Type

No type

Projects

No projects

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions