Skip to content

Memory performance regresssion in Dask Dataframe when shuffling on disk #7259

@sinclairtarget

Description

@sinclairtarget

What happened:
I've been using Dask locally on my laptop primarily just as "Pandas but for more data than can fit in memory." But I've run into issues with memory consumption scaling to more than I have on my machine when sorting data. I posted about this to Stack Overflow some weeks ago.

To summarize my SO question, I found that memory consumption was scaling with input size and spiking around one part of the computation in a suspicious way.

I've since done some more investigation and it seems that this spike in memory usage is a regression introduced in release 2.14.0. Here's the resource profiler output from running my sort operation using Dask 2.14.0:

image

Note the memory spike to ~6000mb.

What you expected to happen:
Here is the resource profiler output from running the same sort operation on the same data using Dask 2.13.0:

image

Note that there is no spike.

Minimal Complete Verifiable Example:
Here is my sort operation.

import os
import sys
import dask
import dask.dataframe as dd
from dask.diagnostics import (
    Profiler,
    ResourceProfiler,
    visualize
)


def run(input_path, output_path):
    dask.config.set(scheduler="synchronous")

    filenames = os.listdir(input_path)
    full_filenames = [os.path.join(input_path, f) for f in filenames]

    prof = Profiler()
    rprof = ResourceProfiler()
    with prof, rprof:
        df = dd.read_parquet(full_filenames)
        df = df.set_index("follower_id")
        df.to_parquet(output_path)

    visualize([prof, rprof])


if __name__ == "__main__":
    run(sys.argv[1], sys.argv[2])

Anything else we need to know?:
I suspect this regression was introduced by #5977. That PR added a cleanup task to the task graph implementing the shuffle operation. It looks to me like the cleanup task forces the scheduler to pull all the partitions into memory. Using Dask 2.14.0 (which includes #5977), the ordering of the tasks in the graph (produced by visualize(color="order") shows that all the "collect-1" tasks that read from partd are executed before any of the write tasks, which I assume explains the memory spike.

Using Dask 2.13.0, I can see that the scheduler has an opportunity to order the tasks such that the reads from partd are paired with writes to the output directory so that only one partition is in memory at any time.

Environment:

  • Python version: 3.8.0
  • Operating System: Ubuntu
  • Install method (conda, pip, source): pip

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions