Skip to content

BUG: pandas.DataFrame.to_parquet() causing memory leak #55296

@RizzoV

Description

@RizzoV

Pandas version checks

  • I have checked that this issue has not already been reported.

  • I have confirmed this bug exists on the latest version of pandas.

  • I have confirmed this bug exists on the main branch of pandas.

Reproducible Example

import os
import string
import sys
from random import choice, randint
from uuid import uuid4

import pandas as pd
import pyarrow as pa
from memory_profiler import profile

sample_schema = pa.struct(
    [
        ("a", pa.string()),
        (
            "b",
            pa.struct(
                [
                    ("ba", pa.list_(pa.string())),
                    ("bc", pa.string()),
                    ("bd", pa.string()),
                    ("be", pa.list_(pa.string())),
                    (
                        "bf",
                        pa.list_(
                            pa.struct(
                                [
                                    (
                                        "bfa",
                                        pa.struct(
                                            [
                                                ("bfaa", pa.string()),
                                                ("bfab", pa.string()),
                                                ("bfac", pa.string()),
                                                ("bfad", pa.float64()),
                                                ("bfae", pa.string()),
                                            ]
                                        ),
                                    )
                                ]
                            )
                        ),
                    ),
                ]
            ),
        ),
        ("c", pa.int64()),
        ("d", pa.int64()),
        ("e", pa.string()),
        (
            "f",
            pa.struct(
                [
                    ("fa", pa.string()),
                    ("fb", pa.string()),
                    ("fc", pa.string()),
                    ("fd", pa.string()),
                    ("fe", pa.string()),
                    ("ff", pa.string()),
                    ("fg", pa.string()),
                ]
            ),
        ),
        ("g", pa.int64()),
    ]
)


def generate_random_string(str_length: int) -> str:
    return "".join(
        [choice(string.ascii_lowercase + string.digits) for n in range(str_length)]
    )


@profile
def write_to_parquet(df, output_dir):
    output_path = os.path.join(output_dir, f"{uuid4()}.parquet")
    df.to_parquet(output_path, schema=pa.schema(sample_schema))
    return output_path


@profile
def write_to_json(df, output_dir):
    output_path = os.path.join(output_dir, f"{uuid4()}.json")
    df.to_json(output_path)
    return output_path


def main():
    output_dir = os.path.abspath(
        sys.argv[1]
    )  # destination for temporary files when outputting Parquets/JSONs

    if not os.path.isdir(output_dir):
        print("Creating output dir", output_dir)
        os.makedirs(output_dir)

    for i in range(10000):
        df = pd.DataFrame.from_dict(generate_random_data())
        # pa.jemalloc_set_decay_ms(0)
        output_path = write_to_parquet(df, output_dir)  # memory leak
        # output_path = write_to_json(df, output_dir)  # stable memory usage
        if output_path is not None:
            os.remove(output_path)


def generate_random_data():
    return {
        "a": [generate_random_string(128)],
        "b": [
            {
                "ba": [generate_random_string(128) for i in range(50)],
                "bc": generate_random_string(128),
                "bd": generate_random_string(128),
                "be": [generate_random_string(128) for i in range(50)],
                "bf": [
                    {
                        "bfa": {
                            "bfaa": generate_random_string(128),
                            "bfab": generate_random_string(128),
                            "bfac": generate_random_string(128),
                            "bfad": randint(0, 2**32),
                            "bfae": generate_random_string(128),
                        }
                    }
                ],
            }
        ],
        "c": [randint(0, 2**32)],
        "d": [randint(0, 2**32)],
        "e": [generate_random_string(128)],
        "f": [
            {
                "fa": generate_random_string(128),
                "fb": generate_random_string(128),
                "fc": generate_random_string(128),
                "fd": generate_random_string(128),
                "fe": generate_random_string(128),
                "ff": generate_random_string(128),
                "fg": generate_random_string(128),
            }
        ],
        "g": [randint(0, 2**32)],
    }


if __name__ == "__main__":
    main()

Issue Description

pandas.DataFrame.to_parquet() causes a memory leak when engine='pyarrow' (default option).

Using another engine (e.g.: engine='fastparquet') or outputting the same data in another format (e.g.: pandas.DataFrame.to_json(), see write_to_json() in the Reproducible Example) avoids the memory leak.

The problem seems to be more pronounced on DataFrames containing nested structs. A sample problematic data schema and a compliant data generator is included in the Reproducible Example.

From the Reproducible Example above:

  • 1st pd.DataFrame.to_parquet() call:
Line #    Mem usage    Increment  Occurrences   Line Contents
=============================================================
    74     91.9 MiB     91.9 MiB           1   @profile
    75                                         def write_to_parquet(df, output_dir):
    76     91.9 MiB      0.0 MiB           1       output_path = os.path.join(output_dir, f"{uuid4()}.parquet")
    77     92.1 MiB      0.1 MiB           1       df.to_parquet(output_path, schema=pa.schema(sample_schema))
    78     92.1 MiB      0.0 MiB           1       return output_path
  • 2000th call:
Line #    Mem usage    Increment  Occurrences   Line Contents
=============================================================
    74    140.1 MiB    140.1 MiB           1   @profile
    75                                         def write_to_parquet(df, output_dir):
    76    140.1 MiB      0.0 MiB           1       output_path = os.path.join(output_dir, f"{uuid4()}.parquet")
    77    140.1 MiB      0.0 MiB           1       df.to_parquet(output_path, schema=pa.schema(sample_schema))
    78    140.1 MiB      0.0 MiB           1       return output_path
  • 10000th call:
Line #    Mem usage    Increment  Occurrences   Line Contents
=============================================================
    74    330.5 MiB    330.5 MiB           1   @profile
    75                                         def write_to_parquet(df, output_dir):
    76    330.5 MiB      0.0 MiB           1       output_path = os.path.join(output_dir, f"{uuid4()}.parquet")
    77    330.5 MiB      0.0 MiB           1       df.to_parquet(output_path, schema=pa.schema(sample_schema))
    78    330.5 MiB      0.0 MiB           1       return output_path

VS the same code but setting engine='fastparquet' in pd.DataFrame.to_parquet()

  • 1st call:
Line #    Mem usage    Increment  Occurrences   Line Contents
=============================================================
    74     91.6 MiB     91.7 MiB           1   @profile
    75                                         def write_to_parquet(df, output_dir):
    76     91.6 MiB      0.0 MiB           1       output_path = os.path.join(output_dir, f"{uuid4()}.parquet")
    77     91.6 MiB      0.0 MiB           1       df.to_parquet(output_path, engine='fastparquet')
    78     91.6 MiB      0.0 MiB           1       return output_path
  • 10000th call:
Line #    Mem usage    Increment  Occurrences   Line Contents
=============================================================
    74     91.9 MiB     91.7 MiB           1   @profile
    75                                         def write_to_parquet(df, output_dir):
    76     91.9 MiB      0.0 MiB           1       output_path = os.path.join(output_dir, f"{uuid4()}.parquet")
    77     91.9 MiB      0.0 MiB           1       df.to_parquet(output_path, engine='fastparquet')
    78     91.9 MiB      0.0 MiB           1       return output_path

Expected Behavior

No memory leaks.

Installed Versions

Details INSTALLED VERSIONS ------------------ commit : e86ed37 python : 3.10.9.final.0 python-bits : 64 OS : Darwin OS-release : 22.6.0 Version : Darwin Kernel Version 22.6.0: Fri Sep 15 13:39:52 PDT 2023; root:xnu-8796.141.3.700.8~1/RELEASE_X86_64 machine : x86_64 processor : i386 byteorder : little LC_ALL : None LANG : it_IT.UTF-8 LOCALE : it_IT.UTF-8

pandas : 2.1.1
numpy : 1.26.0
pytz : 2021.3
dateutil : 2.8.2
setuptools : 65.6.3
pip : 23.2.1
Cython : None
pytest : None
hypothesis : None
sphinx : None
blosc : None
feather : None
xlsxwriter : None
lxml.etree : 4.9.1
html5lib : None
pymysql : None
psycopg2 : None
jinja2 : 3.1.2
IPython : 8.11.0
pandas_datareader : None
bs4 : 4.11.1
bottleneck : None
dataframe-api-compat: None
fastparquet : 0.8.3
fsspec : 2022.10.0
gcsfs : None
matplotlib : 3.7.0
numba : None
numexpr : None
odfpy : None
openpyxl : 3.1.2
pandas_gbq : None
pyarrow : 13.0.0
pyreadstat : None
pyxlsb : None
s3fs : 2022.10.0
scipy : 1.10.1
sqlalchemy : None
tables : None
tabulate : None
xarray : None
xlrd : None
zstandard : None
tzdata : 2023.3
qtpy : None
pyqt5 : None

Metadata

Metadata

Assignees

No one assigned

    Labels

    Arrowpyarrow functionalityBugPerformanceMemory or execution speed performanceUpstream issueIssue related to pandas dependency

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions