-
-
Notifications
You must be signed in to change notification settings - Fork 1.9k
Closed
Description
This issue is a simplified version of this other issue that better reproduces the problem.
When running the script below using dask 0.17.2–0.19.4, the output hdf5 file is written to disk only when the processing graph is about to complete (i.e., close to 100% according to the progress bar). As a result, memory usage increases significantly and leads to a crash. This is not a problem for version 0.17.1.
The script below does the following (see code for details):
- It creates a subdirectory (called test_data) in the current dir.
- It creates 60 hdf5 files' worth of fake data in test_data (required disk space: 6 GB). Note that I tried to create an example that uses data generated on the fly by using, for example,
dask.array.random.random, instead of having actual files on disk. However, that doesn't reproduce the problem. - It loads the hdf5 files into a list of 2D dask arrays, and stacks them.
- It applies
map_blocks.
from pathlib import Path
import numpy as np
import h5py
import dask.array as da
import dask.diagnostics as dd
# define chunks
chunks = (2000, 2000)
# create subdirectory in current dir
directory = Path('./test_data')
directory.mkdir(exist_ok=True)
# create fake images (hdf5 datasets) in subdirectory
number_of_images = 60
random_data = da.random.random(size=(40000, 20000), chunks=chunks)
with dd.ProgressBar():
da.to_hdf5(directory.joinpath('fake.hdf5'), '/data', random_data)
for image_number in range(number_of_images):
symlink = directory.joinpath('data_{}.hdf5'.format(image_number))
if not symlink.exists():
symlink.symlink_to('fake.hdf5')
# create list of dask arrays based on hdf5 files
filenames = directory.glob('data*.hdf5')
dask_arrays_list = [
da.from_array(h5py.File(filename, 'r')['/data'],
chunks=chunks)
for filename in filenames
]
# stack dask arrays
dask_array = da.stack(dask_arrays_list, axis=0).rechunk({0: number_of_images})
# apply map_blocks to stacked dask arrays
result = da.map_blocks(lambda x: np.mean(x, axis=0),
dask_array, dtype='f8', drop_axis=0)
# compute/save to hdf5
with dd.ProgressBar():
da.to_hdf5('result.hdf5', '/data', result)Version Information
-
Python version: 3.5.2
-
Dask 0.17.1; pip freeze output:
Details
dask==0.17.1
h5py==2.8.0
numpy==1.15.2
six==1.11.0
toolz==0.9.0
- Dask 0.19.4; pip freeze output:
Details
dask==0.19.4
h5py==2.8.0
numpy==1.15.2
six==1.11.0
toolz==0.9.0
Reactions are currently unavailable
Metadata
Metadata
Assignees
Labels
No labels