Skip to content

Correct way to create pipeline with dask imperative, hd5 and multiprocessing #963

@zumpchke

Description

@zumpchke

I am interested in using dask to create a signal processing pipeline with multivariate time series data (similar to EEG). Is this the correct way to construct a pipeline using dask.imperative and a hdf5 file containing datasets with size ~1000x3?

import dask
import h5py
import dask.array as da
from dask.threaded import get 
from dask.dot import dot_graph
import dask.multiprocessing
from dask.diagnostics import ProgressBar
from dask.imperative import do, value
import numpy as np

handle = h5py.File('mydata.mat.h5')

@do
def load(data):
    return da.from_array(data, chunks=1000)

@do
def work(da_data):
    da.fft.fft(da_data[:,1])
    da.fft.fft(da_data[:,2])
    da.fft.fft(da_data[:,3])
    return 0

@do
def combine(stuff):
    print len(stuff)

loaded = [load(np.array(handle[name])) for name in handle.keys()]
freq = [work(t) for t in loaded]
end = combine(freq)

with ProgressBar():
    end.compute(get=dask.multiprocessing.get, num_workers=4)

(Total time is about 3.5 seconds on CPU)

I ask because I had to convert my data to a numpy array to avoid this strange error from h5py. From my searching of the internet, it seems to be multiprocessing related.

Exception KeyError: KeyError(139818387277936,) in 'h5py._objects.ObjectID.__dealloc__' ignored

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions