-
-
Notifications
You must be signed in to change notification settings - Fork 1.9k
Closed
Description
I am interested in using dask to create a signal processing pipeline with multivariate time series data (similar to EEG). Is this the correct way to construct a pipeline using dask.imperative and a hdf5 file containing datasets with size ~1000x3?
import dask
import h5py
import dask.array as da
from dask.threaded import get
from dask.dot import dot_graph
import dask.multiprocessing
from dask.diagnostics import ProgressBar
from dask.imperative import do, value
import numpy as np
handle = h5py.File('mydata.mat.h5')
@do
def load(data):
return da.from_array(data, chunks=1000)
@do
def work(da_data):
da.fft.fft(da_data[:,1])
da.fft.fft(da_data[:,2])
da.fft.fft(da_data[:,3])
return 0
@do
def combine(stuff):
print len(stuff)
loaded = [load(np.array(handle[name])) for name in handle.keys()]
freq = [work(t) for t in loaded]
end = combine(freq)
with ProgressBar():
end.compute(get=dask.multiprocessing.get, num_workers=4)
(Total time is about 3.5 seconds on CPU)
I ask because I had to convert my data to a numpy array to avoid this strange error from h5py. From my searching of the internet, it seems to be multiprocessing related.
Exception KeyError: KeyError(139818387277936,) in 'h5py._objects.ObjectID.__dealloc__' ignored
Reactions are currently unavailable
Metadata
Metadata
Assignees
Labels
No labels