So, this is my first time using Dask and maybe I’m doing something wrong, but I’ve been having this issue consistently with the data I am working with and I am left wondering why.
I’m loading data from a Parquet file where among the columns there is only one relevant to this issue that is called “EvenNr”· In the data, each row represents a signal from a detector, so the signals that are recieved in the same time window have the same value for the “EventNr” column. I want to select a random number of EventNr’s, so I need to retrieve the rows with the same EventNr value for each EventNr value that is chosen. This is the code I am using:
import dask.dataframe as dd
ddf = dd.read_parquet(file_path, engine='pyarrow')
ddf = repartition(ddf)
events = np.random.choice(np.unique(ddf.EventNr.compute().values),size=10)
idx = ddf.index.compute()[np.isin(ddf.EventNr,events)].tolist()
ddf_sample = ddf.loc[idx]
I wrote this repartition
function so that no rows with the same value for EventNr were split into different partitions:
def repartition(ddf, npartitions=10):
max_enr = ddf.EventNr.max()
divs_enr = np.floor(np.arange(npartitions+1)/npartitions*max_enr).astype(np.int32)
divs_idx = tuple(map(lambda x: ddf.index.compute()[ddf.EventNr==x].min(), divs_enr))
ddf_new = ddf.repartition(npartitions=npartitions)
ddf_new.divisions = divs_idx
return ddf_new
When I run the code, everything seems fine until I try to run ddf_sample.compute()
, which results in a KeyError taht says that a bunch of indexes don’t exist and cannot be retrieved from the dataframe.
This didn’t sound right to me, so I tried inverting the order and tryingddf.compute().loc[idx]
instead. To my surprise, this didn’t raise any exceptions and worked as intended, so I’m left wondering if this is some kind of bug or I am doing something wrong.
I’m using Dask to avoid loading into memory the whole dataframe, so doing ddf.compute().loc[idx]
seems counterproductive to me. I would really appreciate any help I can get : ).