Reading h5 files: no keys and how to access partitions?

Hi,
so in short i am trying to port a pandas software to dask. For that purpose i have set up a jupyter notebook to try a little with Dask. I have around 300 .h5 files with each file containing over 600 keys (with names which are important). With pandas, i can read the files in a with-context, access the list of keys and iterate through them processing every single one.

When reading with Dask, every key turns into a partition. When printing the divisions, it simply prints “None” for the amount of keys. Is there a way to get access to the name of the keys as they are important for the processing?

Also i tried iterating through the partitions. I’ve seen the map function but in my case i need to add the name of the keys to a configuration file which requires to read out the partition i am currently processing. Also every partition needs to be processed step after step as there is information added to them. Is there a way i can get the number of partition i am? (I’ve accessed and compared the number of partitions to the number of keys and the DF was always identical so i think its safe to go).

I know this sounds like i can do everything with pandas, but there is also npz and csv processing in the software with files that will not fit the memory, but for now i am stuck at handling the h5 files. :slight_smile:

Hi @MarcoPasta, welcome to this forum!

In order to better answer your question, could you build a minimum reproducible example, or at least give some code snippets?

When you work with Pandas, do you have access to the name of the key from the DataFrame object? Or do you read it externally before feeding the data to Pandas? If you need the keys, it seems to me you should try to adding it to the DataFrame object when working with Dask.

Maybe a solution would also be to work with Delayed instead of Dask DataFrame in this case. We need to have some code to better understand and give advice.

This might get tricky, do you need to process the partitions one by one sequentially? Then Dask won’t be a good solution. Or do you need to process sequentially inside one partition?

You could use dask.dataframe.DataFrame.map_partitions — Dask documentation.

The function applied to each partition. If this function accepts the special partition_info keyword argument, it will receive information on the partition’s relative location within the dataframe.

You could still stick with Pandas for the beginning of you workflow and use Dask afterwards.

Hi @guillaumeeb and thanks for your reply!

A simple code example could be the following:

import pandas as pd
import dask.dataframe as dd

file="path/to/file.h5"

data: pd.DataFrame
with pd.HDFStore(file, "r") as h5store: 
    print(list(h5store.keys())
    data = h5store.get("spot_x") # spot is the name of the key
data[0:5]

This is the code example from my notebook where i can print every key with the pandas method. When doing the same with dask, it looks like this:

h5df = dd.read_hdf(file, "/*", mode="r")
h5df.divisions

This gives me a Set of None, converting to a list doesnt change anything, just Nones…

In the code of the software, it looks something like this:

with pd.HDFStore(file, "r") as h5store: 
    for key in h5store.keys():    # gives access to the keyname as string but not the DF
        print("\ncurrently processed key: ", key, "\n")
        # stuff happens here that i cannot share but i try to reproduce
        config["args"]["h5_key"] = key    # this is where the key name is added to the config

This might get tricky, do you need to process the partitions one by one sequentially?

Technically no, this process could be parallelized. Lets say we have 64 cores, then we could load and process 64 partitions at the same time (each is around 1GB in size). But to process, we first need to extract the key name (or at least the partition) to copy in our config so we can continue processing the data. I can try to build a function and access the partition_info of that, i completely oversaw that!

Edit: sadly, partition_info also returns a None. :confused:

OK, so I built a little toy example.

First, I generated some data:

import numpy as np
import h5py
import pandas as pd
import dask
import dask.dataframe as dd

d1 = np.random.random(size = (1000,10))
d2 = np.random.random(size = (1200,10))
d3 = np.random.random(size = (1400,10))

hdf = pd.HDFStore('hdf_file.h5')
df1 = pd.DataFrame(d1)
df2 = pd.DataFrame(d2)
df3 = pd.DataFrame(d3)
hdf.put('spot_1', df1, format='table')
hdf.put('spot_2', df2, format='table')
hdf.put('spot_3', df3, format='table')
hdf.close()

I hope this is representative of your use case. I’m able to reproduce your code above with pd.HDFStore.

Indeed, when reading with Dask dataframes, divisions are None, because divisions is

A sequence of partition boundaries along the index

Here, Dask has no way to know the divisions along the index. You could try to use the sorted_index kwarg, but not sure if it applies in your case. However, Dask knows perfectly well the number of partitions, which should correspond to the number of HDF keys (if your data is not to big per key):

file="hdf_file.h5"
h5df = dd.read_hdf(file, "/spot_*", mode="r")
print(h5df.divisions)
print(h5df.npartitions)

returns

(None, None, None, None)
3

When calling map_partitions, you’ll get the partition index:

def func(partition, partition_info=None):
    print(partition_info)
    return partition

h5df.map_partitions(func).compute()

outputs:

None
{'number': 0, 'division': 0}
{'number': 1, 'division': 0}
{'number': 2, 'division': 0}

The first None is because Dask runs the function once with an empty DataFrame to infer the output metadata. So you might be able to use this number key to infer your HDF key.

But you can also do things differently, providing this is an embarrassingly parallel use case over all the partitions. You can just use Delayed API:

def my_func_delayed(h5store, key):
    print(f'processing key {key}')
    data = h5store.get(key).copy( )#Making a copy in order to avoid modifying input file.
    data['key'] = key #this is just an example of computation, but this will also help for the next example.
    return data

with pd.HDFStore(file, "r") as h5store: 
    results = []
    for key in h5store.keys(): 
        results.append(dask.delayed(my_func_delayed)(h5store, key))
    dask.compute(*results)

outputs:

processing key /spot_2
processing key /spot_1
processing key /spot_3

This allows you to process each key independently, loading data directly on Dask Workers if you use Distributed. You even see that keys are not processed in sequential order.

If you really want to build a big Dask Dataframe, and be sure to have the correct key, you can use the functions above and build a Dataframe from Delayed objects:

def my_func_map_partitions(partition):
    print(f'processing partition {partition.key[0]}')
    return partition

with pd.HDFStore(file, "r") as h5store: 
    results = []
    for key in h5store.keys(): 
        results.append(dask.delayed(my_func_delayed)(h5store, key))
    ddf = dd.from_delayed(results)
    ddf.map_partitions(my_func_map_partitions, meta=ddf).compute()

outputs:

processing key /spot_1
processing key /spot_1
processing key /spot_2
processing key /spot_3
processing partition /spot_1
processing partition /spot_3
processing partition /spot_2

Again, the first ouput is the metadata inference, you can avoid this by manually telling Dask which metadata to use when calling dd.from_delayed(results).

Hope that helps.

1 Like