Skip to content

Suboptimal graph structure when read-writing a parquet #8445

@davidhao3300

Description

@davidhao3300

What happened:

to-parquet ops don't appear to "fuse" together with the rest of the linear chain. Since fusing chains is the current workaround for dask/distributed#3974, this results in larger-than-necessary memory usage due to workers picking up work from other partitions earlier than they should.

What you expected to happen:

to-parquet ops should be merged into the linear chain like the other ops do. See code below for more explanation.

Minimal Complete Verifiable Example:

import pandas as pd
import dask.dataframe as dd
import numpy as np

df = pd.DataFrame(np.random.randn(100,100), columns=[str(x) for x in range(100)])

ddf = dd.from_pandas(df, npartitions=2)
ddf.to_parquet("input")

ddf = dd.read_parquet("input")
ddf = ddf.to_parquet('test', compute=False)
ddf.visualize(optimize_graph=True)

This is the unoptimized graph:

Screen Shot 2021-12-02 at 3 23 53 PM

This is the optimized graph:

Screen Shot 2021-12-02 at 3 24 01 PM

Ideally the to-parquet op would be part of the fused chain. That way workers have no chance to prematurely switch to working on a different chain. I'm not aware of any reason that the to-parquet op should remain unfused.

Anything else we need to know?:

Environment:

  • Dask version: 2021.11.2
  • Python version: 3.8.x
  • Operating System:
  • Install method (conda, pip, source):

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions