Skip to content

DataStore memory usage exceeds pandas for analytics operations on large datasets #552

@wudidapaopao

Description

@wudidapaopao

Summary

When running typical pandas-style analytics (load, filter, groupby, join, concat, etc.) on 10M-row Parquet files, DataStore sometimes uses more peak memory than plain pandas .

Benchmark results

Each test runs in an isolated subprocess. Peak memory measured via VmHWM from /proc/self/status.

Test Operation pandas peak DataStore peak DS / pd
T01 Load parquet (10M rows) 1918 MB 1953 MB 1.02x
T02 Filter + column select 2699 MB 2699 MB 1.00x
T03 GroupBy single key 2153 MB 2167 MB 1.01x
T05 Sort + top-N 2776 MB 3753 MB 1.35x
T06 Derived columns (assign) 2308 MB 3821 MB 1.66x
T07 Two-table merge/join 2487 MB 4506 MB 1.81x
T08 Three-table join + agg 2685 MB 4655 MB 1.73x
T12 Filter→join→groupby→sort 2729 MB 4399 MB 1.61x
T14 fillna + conditional + groupby 2175 MB 3299 MB 1.52x
T15 concat + groupby 3000 MB 4367 MB 1.46x

Especially for join/merge (T07, T08, T12) and concat (T15), DataStore peak memory is significantly higher than pandas.

Reproduction

Step 1: Generate test data (~205 MB on disk, ~1.5 GB in memory)

# generate_data.py
import numpy as np, pandas as pd, os
np.random.seed(42)
DATA_DIR = "./bench_data"
os.makedirs(DATA_DIR, exist_ok=True)

# Products (100K rows)
n_products = 100_000
products = pd.DataFrame({
    'product_id': np.arange(1, n_products + 1),
    'product_name': [f"Product_{i:06d}" for i in range(1, n_products + 1)],
    'category': np.random.choice(['Electronics','Clothing','Home','Sports','Books',
                                   'Toys','Food','Health','Auto','Office'], n_products),
    'price': np.round(np.random.lognormal(3.5, 1.2, n_products), 2).clip(0.99, 9999.99),
})
products.to_parquet(f"{DATA_DIR}/products.parquet", index=False)

# Orders (10M rows)
n_orders = 10_000_000
orders = pd.DataFrame({
    'order_id': np.arange(1, n_orders + 1),
    'customer_id': np.random.randint(1, 1_000_001, n_orders),
    'product_id': np.random.randint(1, n_products + 1, n_orders),
    'order_date': pd.to_datetime('2022-01-01') + pd.to_timedelta(
        np.random.randint(0, 365*3, n_orders), unit='D'),
    'quantity': np.random.randint(1, 20, n_orders),
    'unit_price': np.round(np.random.lognormal(3.5, 1.2, n_orders), 2).clip(0.99, 9999.99),
    'discount_pct': np.random.choice([0,0,0,0,5,10,15,20,25,30], n_orders).astype(float),
    'status': np.random.choice(['completed','shipped','pending','cancelled','returned'],
                                n_orders, p=[0.60,0.15,0.10,0.10,0.05]),
    'channel': np.random.choice(['web','mobile','app','store','phone'],
                                 n_orders, p=[0.35,0.25,0.20,0.15,0.05]),
    'shipping_cost': np.round(np.random.exponential(8.0, n_orders), 2),
})
orders.to_parquet(f"{DATA_DIR}/orders.parquet", index=False)
print("Done. Files:", os.listdir(DATA_DIR))

Step 2: Run benchmark

# bench_memory.py
"""Compare peak memory: DataStore vs pandas for common operations."""
import subprocess, sys, os, json, textwrap

DATA_DIR = "./bench_data"

TESTS = {
    "T01_load": """
df = lib.read_parquet(f"{data}/orders.parquet")
n = len(df)
""",
    "T02_filter": """
df = lib.read_parquet(f"{data}/orders.parquet")
result = df[df['status'] == 'completed'][['order_id','customer_id','unit_price','quantity']]
n = len(result)
""",
    "T03_groupby": """
df = lib.read_parquet(f"{data}/orders.parquet")
df = df.assign(revenue=df['unit_price'] * df['quantity'])
result = df.groupby('status')['revenue'].sum()
n = len(result)
""",
    "T06_assign": """
df = lib.read_parquet(f"{data}/orders.parquet")
df = df.assign(
    revenue=df['unit_price'] * df['quantity'],
    net_revenue=df['unit_price'] * df['quantity'] * (1 - df['discount_pct'] / 100),
    total_cost=df['unit_price'] * df['quantity'] + df['shipping_cost'],
)
n = len(df)
""",
    "T07_merge": """
orders = lib.read_parquet(f"{data}/orders.parquet")
products = lib.read_parquet(f"{data}/products.parquet")
result = orders.merge(products, on='product_id', how='left')
n = len(result)
""",
    "T15_concat_groupby": """
df = lib.read_parquet(f"{data}/orders.parquet")
sample = df.head(5_000_000)
big = lib.concat([sample, sample], ignore_index=True)
result = big.groupby(['status', 'channel'])['unit_price'].sum()
n = len(result)
""",
}

RUNNER = textwrap.dedent("""\
import os, sys
data = "{data}"
lib_name = sys.argv[1]
if lib_name == "pandas":
    import pandas as lib
    lib.concat = lib.concat
else:
    from datastore import DataStore as _DS
    class _Lib:
        def read_parquet(self, p): return _DS.from_file(p)
        def concat(self, objs, **kw): return objs[0].concat(objs, **kw)
    lib = _Lib()
{code}
# Read VmHWM
with open("/proc/self/status") as f:
    for line in f:
        if line.startswith("VmHWM"):
            peak_kb = int(line.split()[1])
            print(f"PEAK_MB={{peak_kb // 1024}}")
            break
""")

results = {}
for test_name, code in TESTS.items():
    results[test_name] = {}
    for lib_name in ["pandas", "datastore"]:
        script = RUNNER.format(data=DATA_DIR, code=code)
        proc = subprocess.run(
            [sys.executable, "-c", script, lib_name],
            capture_output=True, text=True, timeout=300,
        )
        peak = 0
        for line in proc.stdout.splitlines():
            if line.startswith("PEAK_MB="):
                peak = int(line.split("=")[1])
        results[test_name][lib_name] = peak
        status = "OK" if proc.returncode == 0 else "FAIL"
        print(f"  [{lib_name:10s}] {test_name:30s} | peak={peak}MB | {status}")

print("\n" + "=" * 70)
print(f"{'Test':<25s} {'pandas':>10s} {'datastore':>10s} {'ratio':>8s}")
print("-" * 70)
for t, r in results.items():
    pd_mb = r.get("pandas", 0)
    ds_mb = r.get("datastore", 0)
    ratio = f"{ds_mb/pd_mb:.2f}x" if pd_mb > 0 else "N/A"
    flag = " ⚠" if ds_mb > pd_mb else ""
    print(f"{t:<25s} {pd_mb:>8d}MB {ds_mb:>8d}MB {ratio:>8s}{flag}")

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type
    No fields configured for issues without a type.

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions