OK, so I built a little toy example.
First, I generated some data:
import numpy as np
import h5py
import pandas as pd
import dask
import dask.dataframe as dd
d1 = np.random.random(size = (1000,10))
d2 = np.random.random(size = (1200,10))
d3 = np.random.random(size = (1400,10))
hdf = pd.HDFStore('hdf_file.h5')
df1 = pd.DataFrame(d1)
df2 = pd.DataFrame(d2)
df3 = pd.DataFrame(d3)
hdf.put('spot_1', df1, format='table')
hdf.put('spot_2', df2, format='table')
hdf.put('spot_3', df3, format='table')
hdf.close()
I hope this is representative of your use case. I’m able to reproduce your code above with pd.HDFStore
.
Indeed, when reading with Dask dataframes, divisions are None, because divisions is
A sequence of partition boundaries along the index
Here, Dask has no way to know the divisions along the index. You could try to use the sorted_index
kwarg, but not sure if it applies in your case. However, Dask knows perfectly well the number of partitions, which should correspond to the number of HDF keys (if your data is not to big per key):
file="hdf_file.h5"
h5df = dd.read_hdf(file, "/spot_*", mode="r")
print(h5df.divisions)
print(h5df.npartitions)
returns
(None, None, None, None)
3
When calling map_partitions, you’ll get the partition index:
def func(partition, partition_info=None):
print(partition_info)
return partition
h5df.map_partitions(func).compute()
outputs:
None
{'number': 0, 'division': 0}
{'number': 1, 'division': 0}
{'number': 2, 'division': 0}
The first None
is because Dask runs the function once with an empty DataFrame to infer the output metadata. So you might be able to use this number
key to infer your HDF key.
But you can also do things differently, providing this is an embarrassingly parallel use case over all the partitions. You can just use Delayed API:
def my_func_delayed(h5store, key):
print(f'processing key {key}')
data = h5store.get(key).copy( )#Making a copy in order to avoid modifying input file.
data['key'] = key #this is just an example of computation, but this will also help for the next example.
return data
with pd.HDFStore(file, "r") as h5store:
results = []
for key in h5store.keys():
results.append(dask.delayed(my_func_delayed)(h5store, key))
dask.compute(*results)
outputs:
processing key /spot_2
processing key /spot_1
processing key /spot_3
This allows you to process each key independently, loading data directly on Dask Workers if you use Distributed. You even see that keys are not processed in sequential order.
If you really want to build a big Dask Dataframe, and be sure to have the correct key, you can use the functions above and build a Dataframe from Delayed objects:
def my_func_map_partitions(partition):
print(f'processing partition {partition.key[0]}')
return partition
with pd.HDFStore(file, "r") as h5store:
results = []
for key in h5store.keys():
results.append(dask.delayed(my_func_delayed)(h5store, key))
ddf = dd.from_delayed(results)
ddf.map_partitions(my_func_map_partitions, meta=ddf).compute()
outputs:
processing key /spot_1
processing key /spot_1
processing key /spot_2
processing key /spot_3
processing partition /spot_1
processing partition /spot_3
processing partition /spot_2
Again, the first ouput is the metadata inference, you can avoid this by manually telling Dask which metadata to use when calling dd.from_delayed(results)
.
Hope that helps.