H5py objects cannot be pickled or slow processing

Hello, according to this: Create Dask Arrays — Dask documentation
its possible to make dask arrays from hdf files.
I have an hdf file which i cannot fit into memory which means it should be read in chunks, so I do this

import h5py
from dask.distributed import Client, LocalCluster
import dask.array as da
import numpy as np
from scipy import signal
from dask.diagnostics import ProgressBar

cluster = LocalCluster(n_workers=22, threads_per_worker=4)
client = Client(cluster)

def decimate(A, q, axis=0):
    return da.from_array(A, chunks=(A.shape[0], 1)).map_blocks(
        signal.decimate, q=q, axis=axis, meta=np.array((), dtype=np.float64)
    ).compute(scheduler='processes')
with h5py.File(h5file,'r') as F:
    DS = F['mydata']
    A = decimate(decimate(decimate(decimate(DS, 5), 5), 4), 4)

this however gives me the following error: “h5py objects cannot be pickled”.

If i then do this instead:

cluster = LocalCluster(n_workers=1, threads_per_worker=88)
client = Client(cluster)
def decimate(A, q, axis=0):
    return da.from_array(A, chunks=(A.shape[0], 1)).map_blocks(
        signal.decimate, q=q, axis=axis, meta=np.array((), dtype=np.float64)
    ).compute(scheduler='threads')
with h5py.File(h5file,'r') as F:
    DS = F['mydata']
    A = decimate(decimate(decimate(decimate(DS, 5), 5), 4), 4)

I don’t get this error, however the processing gets really really slow (much slower than if i just use ThreadPoolExecutor.

I dont understand what I am doing wrong here. I hope someone can help me.

1 Like

Hi @benja, thanks for the question!

For the h5py objects cannot be pickled error, I think you can fix this by pulling da.from_array out of your decimate function. Here’s a minimal reproducer:

import h5py
import numpy as np
from scipy import signal
import dask.array as da
from dask.distributed import Client, LocalCluster

cluster = LocalCluster(n_workers=4, threads_per_worker=2)
client = Client(cluster)

# create fake hdf5 for testing
f = h5py.File("tmp/mytestfile.hdf5", "w")
dset = f.create_dataset("mydataset", (1000, 3), dtype='i')

# read it back in
f = h5py.File('tmp/mytestfile.hdf5', 'r')
dset = f['mydataset']

dask_array = da.from_array(dset, chunks=(dset.shape[0], 1))

# apply decimate function
decimated_dask_array = dask_array.map_blocks(
    signal.decimate, 5, axis=0, meta=np.array((), dtype=np.float64)
).map_blocks(
    signal.decimate, 5, axis=0, meta=np.array((), dtype=np.float64)
).map_blocks(
    signal.decimate, 4, axis=0, meta=np.array((), dtype=np.float64)
)

# look at task graph
decimated_dask_array.visualize()

# compute
decimated_dask_array.compute()

In the task graph each chunk is processed in parallel:

Additionally, when you call compute(scheduler='processes') after already having started the Distributed scheduler, you’ll be using the multiprocessing scheduler instead, which is why I’ve removed that in the compute() call in this example (more on schedulers here).

I think your question has helped uncover a bug in Dask, though, since when I tried to see if your example would also work using the multiprocessing scheduler, I got the same TypeError: h5py objects cannot be pickled:

# same file as above
f = h5py.File('tmp/mytestfile.hdf5', 'r')
dset = f['mydataset']

dask_array = da.from_array(dset, chunks=(dset.shape[0], 1))

# apply decimate function
decimated_dask_array = dask_array.map_blocks(
    signal.decimate, 5, axis=0, meta=np.array((), dtype=np.float64)
).map_blocks(
    signal.decimate, 5, axis=0, meta=np.array((), dtype=np.float64)
).map_blocks(
    signal.decimate, 4, axis=0, meta=np.array((), dtype=np.float64)
)

# compute
decimated_dask_array.compute(scheduler='processes')

I would encourage you to open up an issue, as I’d expect this to work.

1 Like

Cross-linking to the open GitHub issue Error pickling h5py objects with multiprocessing scheduler · Issue #8760 · dask/dask · GitHub

1 Like