-
-
Notifications
You must be signed in to change notification settings - Fork 1.9k
Description
What happened:
I've been using Dask locally on my laptop primarily just as "Pandas but for more data than can fit in memory." But I've run into issues with memory consumption scaling to more than I have on my machine when sorting data. I posted about this to Stack Overflow some weeks ago.
To summarize my SO question, I found that memory consumption was scaling with input size and spiking around one part of the computation in a suspicious way.
I've since done some more investigation and it seems that this spike in memory usage is a regression introduced in release 2.14.0. Here's the resource profiler output from running my sort operation using Dask 2.14.0:
Note the memory spike to ~6000mb.
What you expected to happen:
Here is the resource profiler output from running the same sort operation on the same data using Dask 2.13.0:
Note that there is no spike.
Minimal Complete Verifiable Example:
Here is my sort operation.
import os
import sys
import dask
import dask.dataframe as dd
from dask.diagnostics import (
Profiler,
ResourceProfiler,
visualize
)
def run(input_path, output_path):
dask.config.set(scheduler="synchronous")
filenames = os.listdir(input_path)
full_filenames = [os.path.join(input_path, f) for f in filenames]
prof = Profiler()
rprof = ResourceProfiler()
with prof, rprof:
df = dd.read_parquet(full_filenames)
df = df.set_index("follower_id")
df.to_parquet(output_path)
visualize([prof, rprof])
if __name__ == "__main__":
run(sys.argv[1], sys.argv[2])Anything else we need to know?:
I suspect this regression was introduced by #5977. That PR added a cleanup task to the task graph implementing the shuffle operation. It looks to me like the cleanup task forces the scheduler to pull all the partitions into memory. Using Dask 2.14.0 (which includes #5977), the ordering of the tasks in the graph (produced by visualize(color="order") shows that all the "collect-1" tasks that read from partd are executed before any of the write tasks, which I assume explains the memory spike.
Using Dask 2.13.0, I can see that the scheduler has an opportunity to order the tasks such that the reads from partd are paired with writes to the output directory so that only one partition is in memory at any time.
Environment:
- Python version: 3.8.0
- Operating System: Ubuntu
- Install method (conda, pip, source): pip

