Hello, according to this: Create Dask Arrays — Dask documentation
its possible to make dask arrays from hdf files.
I have an hdf file which i cannot fit into memory which means it should be read in chunks, so I do this
import h5py
from dask.distributed import Client, LocalCluster
import dask.array as da
import numpy as np
from scipy import signal
from dask.diagnostics import ProgressBar
cluster = LocalCluster(n_workers=22, threads_per_worker=4)
client = Client(cluster)
def decimate(A, q, axis=0):
return da.from_array(A, chunks=(A.shape[0], 1)).map_blocks(
signal.decimate, q=q, axis=axis, meta=np.array((), dtype=np.float64)
).compute(scheduler='processes')
with h5py.File(h5file,'r') as F:
DS = F['mydata']
A = decimate(decimate(decimate(decimate(DS, 5), 5), 4), 4)
this however gives me the following error: “h5py objects cannot be pickled”.
If i then do this instead:
cluster = LocalCluster(n_workers=1, threads_per_worker=88)
client = Client(cluster)
def decimate(A, q, axis=0):
return da.from_array(A, chunks=(A.shape[0], 1)).map_blocks(
signal.decimate, q=q, axis=axis, meta=np.array((), dtype=np.float64)
).compute(scheduler='threads')
with h5py.File(h5file,'r') as F:
DS = F['mydata']
A = decimate(decimate(decimate(decimate(DS, 5), 5), 4), 4)
I don’t get this error, however the processing gets really really slow (much slower than if i just use ThreadPoolExecutor.
I dont understand what I am doing wrong here. I hope someone can help me.