-
Notifications
You must be signed in to change notification settings - Fork 7.4k
[Data] dataset to_arrow_refs recomputing full pipeline, doubling compute time #56601
Copy link
Copy link
Closed
Labels
P1Issue that should be fixed within a few weeksIssue that should be fixed within a few weeksbugSomething that is supposed to be working; but isn'tSomething that is supposed to be working; but isn'tcommunity-backlogdataRay Data-related issuesRay Data-related issuesperformance
Description
What happened + What you expected to happen
When using dataset.to_arrow_refs() the entire compute pipeline is executed twice, once with limit(1) appended to the end, the second without. This causes the materialization process to take twice as long as an equivalent dataset.to_pandas() call.
example dataset calls in the ray dashboard
Versions / Dependencies
ray Version: 2.48.0
Reproduction script
from __future__ import annotations
import os
import time
import pathlib
from dataclasses import dataclass
import numpy as np
import pandas as pd
import pyarrow as pa
import pyarrow.parquet as pq
import ray
import ray.data as rd
@dataclass(frozen=True)
class Paths:
root: pathlib.Path
a: str
b: str
def make_parquet_pairs(rows: int = 10_000) -> Paths:
"""
Create two Parquet files sharing an index 'id'.
- a.parquet: columns [id, category, val_a]
- b.parquet: columns [id, val_b] with some missing ids to exercise the join
"""
root = pathlib.Path(__file__).parent / "tmp"
root.mkdir(exist_ok=True)
a_path = os.path.join(root, "a.parquet")
b_path = os.path.join(root, "b.parquet")
rng = np.random.default_rng(42)
ids = np.arange(rows, dtype=np.int64)
df_a = pd.DataFrame(
{
"id": ids,
"category": rng.integers(0, 50, size=rows, dtype=np.int16),
"val_a": rng.normal(loc=100.0, scale=15.0, size=rows).astype(np.float32),
}
)
# Drop ~20% of ids in b to make the join meaningful
mask = rng.random(rows) > 0.2
ids_b = ids[mask]
df_b = pd.DataFrame(
{
"id": ids_b,
"val_b": rng.normal(loc=50.0, scale=7.0, size=len(ids_b)).astype(np.float32),
}
)
# Write as Parquet
pq.write_table(pa.Table.from_pandas(df_a, preserve_index=False), a_path)
pq.write_table(pa.Table.from_pandas(df_b, preserve_index=False), b_path)
if not pathlib.Path(b_path).exists():
raise ValueError("Expected parquet files not found after write.")
return Paths(root=root, a=a_path, b=b_path)
def run_with_materialize(paths: Paths) -> None:
print("\n=== WITH materialize()-like behavior via explicit materializations ===")
# --- Keep your change: read to Pandas, then rd.from_pandas() ---
df_a = pd.read_parquet(paths.a)
df_b = pd.read_parquet(paths.b)
print(f"df_a shape: {df_a.shape}, df_b shape: {df_b.shape}")
print("df_a head():")
print(df_a.head())
ds_a = rd.from_pandas(df_a)
ds_b = rd.from_pandas(df_b)
# Keep your join style (inner) and partition hint
# (Using supported arg names to avoid API errors while preserving intent.)
joined = ds_a.join(ds_b, on=("id",), join_type="inner", num_partitions=1) # your script used inner
# If you want to force a single partition (heavier tasks), uncomment:
# joined = joined.repartition(1)
result = joined.groupby("category").sum("val_b")
# ---------------------------
# TIMING: Arrow-refs materialization
# ---------------------------
print("\n[Timing] to_arrow_refs() submission + full materialization:")
t0 = time.perf_counter()
refs = result.to_arrow_refs() # Triggers execution; returns list of ObjectRefs
t1 = time.perf_counter()
print(f"Submission time (to_arrow_refs call only): {t1 - t0:0.3f} s")
# Wait for all refs to finish and fetch them; measure full time-to-ready
pending = set(refs)
materialized_tables: list[pa.Table] = []
t2 = time.perf_counter()
while pending:
ready, pending = ray.wait(list(pending), num_returns=1)
tbl: pa.Table = ray.get(ready[0])
materialized_tables.append(tbl)
t3 = time.perf_counter()
print(f"Full materialization time (waiting on refs): {t3 - t2:0.3f} s")
print(f"Total time (submission + materialization): {t3 - t0:0.3f} s")
if materialized_tables:
print("Example Arrow table schema from first ref:")
print(materialized_tables[0].schema)
# ---------------------------
# TIMING: Pandas materialization
# ---------------------------
print("\n[Timing] to_pandas() full materialization:")
t4 = time.perf_counter()
pdf = result.to_pandas()
t5 = time.perf_counter()
print(f"to_pandas() time: {t5 - t4:0.3f} s")
print(f"Pandas result shape: {pdf.shape}")
print(pdf.head())
print(f"Submission time (to_arrow_refs call only): {t1 - t0:0.3f} s")
print(f"Full materialization time (waiting on refs): {t3 - t2:0.3f} s")
print(f"Total time (submission + materialization): {t3 - t0:0.3f} s")
print(f"to_pandas() time: {t5 - t4:0.3f} s")
def main() -> None:
# Optional: more verbose progress in logs
# os.environ.setdefault("RAY_DATASET_VERBOSE_PROGRESS", "1")
# Start Ray locally; set num_cpus to taste
ray.init(ignore_reinit_error=True)
# Keep your row count change
paths = make_parquet_pairs(rows=100_000_000)
print(f"Temp data dir: {paths.root}")
run_with_materialize(paths)
if __name__ == "__main__":
main()
# Submission time (to_arrow_refs call only): 23.407 s
# Full materialization time (waiting on refs): 0.000 s
# Total time (submission + materialization): 23.407 s
# to_pandas() time: 10.907 sIssue Severity
Medium: It is a significant difficulty but I can work around it.
Reactions are currently unavailable
Metadata
Metadata
Assignees
Labels
P1Issue that should be fixed within a few weeksIssue that should be fixed within a few weeksbugSomething that is supposed to be working; but isn'tSomething that is supposed to be working; but isn'tcommunity-backlogdataRay Data-related issuesRay Data-related issuesperformance