Reading data slices from multiple HDF5 files

Hello !

Similar to my previous post, I’m processing a thousand of heavy HDF5 files for some data analysis. I’m doing it with Dask.

My issue being the way Dask.distributed handles HDF5 files. I looked across the different gihub issues trying to understand how this works.

So the thing is that when I create a dask oriented function to read the slices concurrently, let say like that :

def stack_vars(file):
    

    with h5py.File(file, 'r')  as t_hdf5:
        t_base = t_hdf5['Base']
        dask_arrays = list()
        for key in snap_dict.items():
            dset = t_base[key[1]]
            array = da.from_array(dset)
            array = array[::st_k,y_i,::st_i]
            dask_arrays.append(array.T)

    stack = da.stack(dask_arrays, axis=0)

    return stack

And then I map this function to my serie of HDF5 files, like below :

x_t_fut = client.map(stack_vars, plot_files)

To finally recover the final (not so big i.e. ~ 20Go) data in memory :

x_t = client.gather(x_t_fut)

I end up with an error like this :

TypeError: Could not serialize object of type Array.
Traceback (most recent call last):
  File "/linkhome/rech/genenm01/rgrs001/.conda/envs/jupyter_dask/lib/python3.8/site-packages/distributed/protocol/pickle.py", line 49, in dumps
    result = pickle.dumps(x, **dump_kwargs)
  File "/linkhome/rech/genenm01/rgrs001/.local/lib/python3.8/site-packages/h5py/_hl/base.py", line 372, in __getnewargs__
    raise TypeError("h5py objects cannot be pickled")
TypeError: h5py objects cannot be pickled

So I would ask if anyone performed this kind of operation in a Dask distributed framework and what are the actual best practices to handle several HDF5 files in parallel through h5py (I heard about xarray but it is not really appropriate in my case)

Cheers

Hi @outerspacesloth! I don’t think you need to use client.map or client.gather here, since Dask Array supports reading a chunked hdf5 with from_array. There’s another example here integrating with h5py. The documentation could definitely be improved, so thanks for asking your question and bringing this up!

Hello !

Thank you for your answer ! I actually started with the from_array routine and tried loading the HDF5 lazily but I end up in some bottleneck I did not achieve to avoid. I’m quickly summarizing it below.

Let’s say I have a 1.5To subset of my database and I want to compute a slice of it which is 1.20Go big on 4 workers with 10 cores each, in a distributed framework with the same approach as in the tutorial you cited.

If I do this, I’m encountering a two-fold issue. First, even if my final data is 1.2 Go big, Dask workers completely fill the available memory (~200Go) and spill to disk. I’m aware that several chunks can be loaded, but here it seems like too much.
Second, the execution time for the data-reduction is surprisingly slow, even slower than a sequential approach in some cases. Where I would expect a substantial speed up due to the distributed reading, slicing and stacking. What I notice is that the worker tends to saturate their memory and become very slow.

I have not found workaround for this so far, hence the reason I was trying to use map then gather, but I’ll be more than happy if you have some advice around this :slight_smile:

1 Like

Hello again! Thanks for providing more detail, that’s very helpful and sorry to hear you’re having memory issues.

Going back to your initial TypeError: h5py objects cannot be pickled, we were able to reproduce this and it happens during the client.gather step because the stack_vars function returns a Dask collection (rather than a NumPy array, e.g.). Typically, you would create a task graph on the client, which would then be executed on the workers. In this cause, though, you have a task graph generated on each worker that is never executed. Then, when you try to return that, is when you get the pickling error. As an illustrative example, something like this would work, but then you’re essentially doing what from_array already does:

def read_chunk(chunk):
    with h5py.File("tmp.h5") as f:
        dset=f["x"]
        arr = dset[chunk]
    return arr

with h5py.File("tmp.h5") as f:
    dset=f["x"]
    chunks = list(dset.iter_chunks())

futures = client.map(read_chunk, chunks)

For the memory issues, would you be able to share a small example using from_array and we could troubleshoot from there? It should work and is the recommended way to work with hdf5 files.

Hello (and Merry Christmas)!

Following your recommendation I modified the stack_vars function to be used with numpy objects, I can map the function on all hdf5 files with the workers and gather the result in about 15mins on 4 workers of 40 cores. Which is pretty quick comparing to a sequential read of 1h30mins. The data reduction goes from 17To to 30Go here.

Now if I use the apparently equivalent from_array routine and I loop over the file list this way:


def stack_vars(file):
    
    t_hdf5 = h5py.File(file, 'r')
    t_base = t_hdf5['Base']
    dask_arrays = list()
    for key in snap_dict.items():
        dset = t_base[key[1]]
        array = da.from_array(dset)[::st_k,0:j_max,x_i]
        dask_arrays.append(array.T)

    stack = da.stack(dask_arrays, axis=0)

    return stack

Then

dask_files = list()
for f in files:
    dask_files.append(stack_vars(f))

Finally

x_t = da.stack(dask_files, axis=0).rechunk('256MiB')
x = x_t.compute()

The time required to recover the x array of 30Go, goes to more than 40mins. And the task stream start to get a bit erratic, with workers sometime not working and big spaces between tasks execution. Like shown below.

Is there things I should optimize with this workflow ? Maybe I’m missing some best practices here ?

Cheers

Hello and Happy New Year!

Glad that your approach using futures worked! That is strange that things are so much slower with from_array. From your example it’s hard to tell exactly which part is causing the issue (stacking, transposing, or re-chunking, e.g.). The task stream you shared might indicate an issue with worker communication, but it’s hard to say without being able to reproduce the problem.

Would you be able to share a small minimal reproducer so we could dig into this more? Even better, if you could also share a minimal reproducer for your approach using futures as well, then we could compare the two.

Thanks!