Skip to content

_metadata is re-read for every partition when using fastparquet engine #5842

@ig248

Description

@ig248

Background

Given a (poorly) partitioned dataset of 15000 partitions of ~3k rows each, with a _metadata file, we evaluate e.g.

df = dd.read_parquet(source, engine=engine)  # L1: mostly read _metadata
df.head()  # L2: single task
df.size.compute()  # L3: 15000 tasks

with engine set to either fastparquet or pyarrow. After this speed-up, L1 takes ~25s with fastparquet vs 215s with pyarrow.

However, L2 takes ~20s with fastparquet vs ~0.2-0.3s with pyarrow.

Similarly, L3 takes ~20s per partition (read-parquet-... task) on the workers due to re-reading _metadata.

This makes processing the data almost 100x slower than it could be, and leads to O(n^2) time in size of data when partitions are small.

Steps to reproduce

import numpy as np
import pandas as pd
import dask
import dask.dataframe as dd

dask.config.set(scheduler="sync")
ENGINE = "fastparquet"
path = f"data-{ENGINE}"

!rm -rf $path

n = 100
df = pd.DataFrame({"x": np.arange(n), "y": np.arange(n)})
ddf = dd.from_pandas(df, npartitions=1)

%time ddf.to_parquet(path, partition_on="x", engine=ENGINE)
%time ddf = dd.read_parquet(path, engine=ENGINE)
%time ddf.compute()

Typical compute time: 1.7s for 100 partitions, 5.7s for 200 partitions, etc.

Line profile/code path

Using line_profiler

import dask.dataframe.io.parquet.fastparquet
%load_ext line_profiler
%lprun -u 1 -s -m dask.dataframe.io.parquet.fastparquet ddf.compute()

We can see that most time is spent on 100x calls to _analyze_paths and 100x calls to ParquetFile(base + fs.sep + "_metadata", ...) (showing truncated %lprun output):

...
Total time: 4.86525 s
File: /home/igor/checkouts/third_party/dask/dask/dataframe/io/parquet/fastparquet.py
Function: _determine_pf_parts at line 102

Line #      Hits         Time  Per Hit   % Time  Line Contents
==============================================================
...
   147       100          0.0      0.0      0.2      elif fs.isdir(paths[0]):
   148                                                   # This is a directory, check for _metadata, then _common_metadata
   149       100          0.7      0.0     14.8          paths = fs.glob(paths[0] + fs.sep + "*")
   150       100          2.0      0.0     40.5          base, fns = _analyze_paths(paths, fs)
   151       100          0.0      0.0      0.0          if "_metadata" in fns:
   152                                                       # Using _metadata file (best-case scenario)
   153       100          0.0      0.0      0.0              pf = ParquetFile(
   154       100          0.0      0.0      0.0                  base + fs.sep + "_metadata",
   155       100          0.0      0.0      0.0                  open_with=fs.open,
   156       100          0.0      0.0      0.0                  sep=fs.sep,
   157       100          2.2      0.0     44.4                  **kwargs.get("file", {})
   158                                                       )
...

Graph construction/serialization time

For possibly related reasons, graph construction/serialization for L3 takes many minutes.

Potentially related

#5391
#5321
#4701

Metadata

Metadata

Assignees

No one assigned

    Labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions