-
-
Notifications
You must be signed in to change notification settings - Fork 1.9k
Description
What happened:
to-parquet ops don't appear to "fuse" together with the rest of the linear chain. Since fusing chains is the current workaround for dask/distributed#3974, this results in larger-than-necessary memory usage due to workers picking up work from other partitions earlier than they should.
What you expected to happen:
to-parquet ops should be merged into the linear chain like the other ops do. See code below for more explanation.
Minimal Complete Verifiable Example:
import pandas as pd
import dask.dataframe as dd
import numpy as np
df = pd.DataFrame(np.random.randn(100,100), columns=[str(x) for x in range(100)])
ddf = dd.from_pandas(df, npartitions=2)
ddf.to_parquet("input")
ddf = dd.read_parquet("input")
ddf = ddf.to_parquet('test', compute=False)
ddf.visualize(optimize_graph=True)This is the unoptimized graph:
This is the optimized graph:
Ideally the to-parquet op would be part of the fused chain. That way workers have no chance to prematurely switch to working on a different chain. I'm not aware of any reason that the to-parquet op should remain unfused.
Anything else we need to know?:
Environment:
- Dask version: 2021.11.2
- Python version: 3.8.x
- Operating System:
- Install method (conda, pip, source):

