-
-
Notifications
You must be signed in to change notification settings - Fork 31
Description
I found something interesting related to dask/distributed#1015. I wrote a small test, which replicates how the Distributed workers use the zict buffers. The test sets an artificially low memory limit and fills the buffer with DataFrames or Series:
from __future__ import division, print_function
import pandas as pd
import os
import gc
import psutil
import tempfile
from zict import Buffer, File, Func
# To replicate usage in Distributed workers
from distributed.protocol import deserialize_bytes, serialize_bytelist
from distributed.sizeof import safe_sizeof as sizeof
def get_mem_usage():
process = psutil.Process(os.getpid())
mem_usage = process.memory_info().rss / float(2 ** 20)
return mem_usage
def weight(k, v):
return sizeof(v)
def gen_df(n=10000000):
return pd.DataFrame({"A": [0] * n})
def run_test(store_df=True, store_series=True, explicit_collect=False):
memory_limit = 1
local_dir = tempfile.mkdtemp(prefix='worker-', dir=".")
path = os.path.join(local_dir, 'storage')
storage = Func(serialize_bytelist, deserialize_bytes, File(path))
data = Buffer({}, storage, int(float(memory_limit)), weight)
# simulate filling LRU with data
for iteration in xrange(30):
df = gen_df()
if store_df:
key = "key_{}".format(iteration)
data[key] = df
if store_series:
for c in df.columns:
key = "key_{}_{}".format(iteration, c)
data[key] = df[c]
del c
del df
if explicit_collect:
gc.collect()
print("Iteration: {:2d} Mem: {:12.1f} MB data.fast = {}".format(
iteration, get_mem_usage(), data.fast
))
run_test()When running with store_series=False the memory usage stays pretty much stable at < 60 MB. Apparently the LRU works fine and the garbage collector triggers properly. However, when storing the series as well, the behavior changes to:
Iteration: 0 Mem: 391.8 MB data.fast = <LRU: 0/1 on dict>
Iteration: 1 Mem: 728.1 MB data.fast = <LRU: 0/1 on dict>
Iteration: 2 Mem: 1062.4 MB data.fast = <LRU: 0/1 on dict>
Iteration: 3 Mem: 1394.8 MB data.fast = <LRU: 0/1 on dict>
Iteration: 4 Mem: 1729.1 MB data.fast = <LRU: 0/1 on dict>
Iteration: 5 Mem: 2063.4 MB data.fast = <LRU: 0/1 on dict>
Iteration: 6 Mem: 401.9 MB data.fast = <LRU: 0/1 on dict>
Iteration: 7 Mem: 734.2 MB data.fast = <LRU: 0/1 on dict>
Iteration: 8 Mem: 1066.5 MB data.fast = <LRU: 0/1 on dict>
Iteration: 9 Mem: 1398.8 MB data.fast = <LRU: 0/1 on dict>
Iteration: 10 Mem: 1731.1 MB data.fast = <LRU: 0/1 on dict>
Iteration: 11 Mem: 2063.4 MB data.fast = <LRU: 0/1 on dict>
Iteration: 12 Mem: 2397.7 MB data.fast = <LRU: 0/1 on dict>
Iteration: 13 Mem: 2732.0 MB data.fast = <LRU: 0/1 on dict>
Iteration: 14 Mem: 3066.3 MB data.fast = <LRU: 0/1 on dict>
Iteration: 15 Mem: 3400.7 MB data.fast = <LRU: 0/1 on dict>
Iteration: 16 Mem: 3735.0 MB data.fast = <LRU: 0/1 on dict>
Iteration: 17 Mem: 4069.3 MB data.fast = <LRU: 0/1 on dict>
Iteration: 18 Mem: 4403.6 MB data.fast = <LRU: 0/1 on dict>
Iteration: 19 Mem: 4737.9 MB data.fast = <LRU: 0/1 on dict>
Iteration: 20 Mem: 5072.2 MB data.fast = <LRU: 0/1 on dict>
Iteration: 21 Mem: 5406.5 MB data.fast = <LRU: 0/1 on dict>
Iteration: 22 Mem: 5740.8 MB data.fast = <LRU: 0/1 on dict>
Iteration: 23 Mem: 6075.1 MB data.fast = <LRU: 0/1 on dict>
Iteration: 24 Mem: 6409.4 MB data.fast = <LRU: 0/1 on dict>
Iteration: 25 Mem: 6743.7 MB data.fast = <LRU: 0/1 on dict>
Iteration: 26 Mem: 7077.6 MB data.fast = <LRU: 0/1 on dict>
zsh: killed ./debug_zict.py
Apparently the garbage collector has a problem with this situation. It seems to clean the memory once at around the 2 GB mark, but after that it won't trigger automatically at all, and the process goes out-of-memory. This would explain the issues observed in the Distributed workers. Note: The problem occurs with both glibc and jemalloc.
I don't think this is a true memory leak, because the problem can be solved by enabling the explicit garbage collection, which brings the memory usage back to a constant < 60 MB.
This raises two questions:
- Do you have any idea why storing
pandas.Serieswould throw off the garbage collector? Is this a result of the way the zict buffer works, or rather a Pandas problem? - Considering that explicit garbage collection is a viable work-around: Would you mind having an explicit
gc.collect()somewhere in either zinc'sfast_to_slowor in the Distributed workers? It would probably make sense even without the weird GC issue to minimize heap fragmentation.