-
-
Notifications
You must be signed in to change notification settings - Fork 1.9k
Description
Background
Given a (poorly) partitioned dataset of 15000 partitions of ~3k rows each, with a _metadata file, we evaluate e.g.
df = dd.read_parquet(source, engine=engine) # L1: mostly read _metadata
df.head() # L2: single task
df.size.compute() # L3: 15000 taskswith engine set to either fastparquet or pyarrow. After this speed-up, L1 takes ~25s with fastparquet vs 215s with pyarrow.
However, L2 takes ~20s with fastparquet vs ~0.2-0.3s with pyarrow.
Similarly, L3 takes ~20s per partition (read-parquet-... task) on the workers due to re-reading _metadata.
This makes processing the data almost 100x slower than it could be, and leads to O(n^2) time in size of data when partitions are small.
Steps to reproduce
import numpy as np
import pandas as pd
import dask
import dask.dataframe as dd
dask.config.set(scheduler="sync")
ENGINE = "fastparquet"
path = f"data-{ENGINE}"
!rm -rf $path
n = 100
df = pd.DataFrame({"x": np.arange(n), "y": np.arange(n)})
ddf = dd.from_pandas(df, npartitions=1)
%time ddf.to_parquet(path, partition_on="x", engine=ENGINE)
%time ddf = dd.read_parquet(path, engine=ENGINE)
%time ddf.compute()Typical compute time: 1.7s for 100 partitions, 5.7s for 200 partitions, etc.
Line profile/code path
Using line_profiler
import dask.dataframe.io.parquet.fastparquet
%load_ext line_profiler
%lprun -u 1 -s -m dask.dataframe.io.parquet.fastparquet ddf.compute()We can see that most time is spent on 100x calls to _analyze_paths and 100x calls to ParquetFile(base + fs.sep + "_metadata", ...) (showing truncated %lprun output):
...
Total time: 4.86525 s
File: /home/igor/checkouts/third_party/dask/dask/dataframe/io/parquet/fastparquet.py
Function: _determine_pf_parts at line 102
Line # Hits Time Per Hit % Time Line Contents
==============================================================
...
147 100 0.0 0.0 0.2 elif fs.isdir(paths[0]):
148 # This is a directory, check for _metadata, then _common_metadata
149 100 0.7 0.0 14.8 paths = fs.glob(paths[0] + fs.sep + "*")
150 100 2.0 0.0 40.5 base, fns = _analyze_paths(paths, fs)
151 100 0.0 0.0 0.0 if "_metadata" in fns:
152 # Using _metadata file (best-case scenario)
153 100 0.0 0.0 0.0 pf = ParquetFile(
154 100 0.0 0.0 0.0 base + fs.sep + "_metadata",
155 100 0.0 0.0 0.0 open_with=fs.open,
156 100 0.0 0.0 0.0 sep=fs.sep,
157 100 2.2 0.0 44.4 **kwargs.get("file", {})
158 )
...Graph construction/serialization time
For possibly related reasons, graph construction/serialization for L3 takes many minutes.