-
-
Notifications
You must be signed in to change notification settings - Fork 1.9k
Description
Issue Description
I've noticed that since version 0.17.2 (up to 0.17.5), the output hdf5 file (generated by the code below) is written to disk only when the processing graph is about to complete (i.e., close to 100% according to the progress bar). As a result, memory usage increases significantly and leads to a crash. This is not a problem for version 0.17.1.
import numpy as np
import dask.array as da
import dask.diagnostics as dd
chunks = (2000, 2000)
# create list of dask arrays based on memmap
number_of_images = 60
dask_arrays_list = [
da.from_array(np.memmap('data/data_{}.img'.format(idx),
shape=(65802, 29372), dtype='>f4',
offset=0, mode='r'),
chunks=chunks)[19296:42509, 0:29372].rechunk(chunks)
for idx in range(number_of_images)
]
# stack dask arrays
dask_array = da.stack(dask_arrays_list, axis=0).rechunk({0: number_of_images})
# apply map_blocks to stacked dask arrays
result = da.map_blocks(lambda x: np.mean(x, axis=0),
dask_array, dtype='f8', drop_axis=0)
# compute/save to hdf5
with dd.ProgressBar():
da.to_hdf5('data_test.hdf5', '/data', result)Note that this piece of code is just to reproduce the problem, i.e., the first rechunk (...[19296:42509, 0:29372].rechunk(chunks)) is not necessary for this example (in fact, removing it resolves the issue for this particular case). Also, the use of mean() in the lambda function passed to map_blocks() is a placeholder for a more involved computation.
I did some digging and the problem is resolved if these lines:
deps = [d for d in dependents[item]
if d not in result and not (d in seen and waiting[d])]are switched back to:
deps = [d for d in dependents[item] if d not in result]This change/improvement was introduced in this commit.
I'm not sure how this impacts other use cases or whether this is expected behavior. Any suggestion?
Thanks!
Version Information
-
Python version: 3.5.2
-
Dask 0.17.1; pip freeze output:
Details
click==6.7
cloudpickle==0.5.3
dask==0.17.1
distributed==1.21.8
h5py==2.7.1
HeapDict==1.0.0
locket==0.2.0
msgpack==0.5.6
numpy==1.14.3
pandas==0.23.0
partd==0.3.8
psutil==5.4.5
python-dateutil==2.7.3
pytz==2018.4
six==1.11.0
sortedcontainers==2.0.2
tblib==1.3.2
toolz==0.9.0
tornado==5.0.2
zict==0.1.3
- Dask 0.17.2; pip freeze output:
Details
click==6.7
cloudpickle==0.5.3
dask==0.17.2
distributed==1.21.8
h5py==2.7.1
HeapDict==1.0.0
locket==0.2.0
msgpack==0.5.6
numpy==1.14.3
pandas==0.23.0
partd==0.3.8
psutil==5.4.5
python-dateutil==2.7.3
pytz==2018.4
six==1.11.0
sortedcontainers==2.0.2
tblib==1.3.2
toolz==0.9.0
tornado==5.0.2
zict==0.1.3
- Dask 0.17.5; pip freeze output:
Details
click==6.7
cloudpickle==0.5.3
dask==0.17.5
distributed==1.21.8
h5py==2.7.1
HeapDict==1.0.0
locket==0.2.0
msgpack==0.5.6
numpy==1.14.3
pandas==0.23.0
partd==0.3.8
psutil==5.4.5
python-dateutil==2.7.3
pytz==2018.4
six==1.11.0
sortedcontainers==2.0.2
tblib==1.3.2
toolz==0.9.0
tornado==5.0.2
zict==0.1.3
(Edit)
The script above needs some fake data to run. The following steps generate the required dataset:
- Create a directory called data:
$ mkdir data- Run this to create a single fake image:
import numpy as np
shape = (65802, 29372)
fake_data = np.memmap('data/data.img',
shape=shape, dtype='>f4',
offset=0, mode='w+')
fake_data[:, :] = 7777
del fake_data- Create 60 fake images:
$ cd data
$ for i in {0..59}; do ln data.img data_$i.img; done