DataFrame.loc[[...]].compute() raises KeyError while DataFrame.compute().loc[[...]] doesn't?

So, this is my first time using Dask and maybe I’m doing something wrong, but I’ve been having this issue consistently with the data I am working with and I am left wondering why.

I’m loading data from a Parquet file where among the columns there is only one relevant to this issue that is called “EvenNr”· In the data, each row represents a signal from a detector, so the signals that are recieved in the same time window have the same value for the “EventNr” column. I want to select a random number of EventNr’s, so I need to retrieve the rows with the same EventNr value for each EventNr value that is chosen. This is the code I am using:

import dask.dataframe as dd
ddf = dd.read_parquet(file_path, engine='pyarrow')
ddf = repartition(ddf)


events = np.random.choice(np.unique(ddf.EventNr.compute().values),size=10)
idx = ddf.index.compute()[np.isin(ddf.EventNr,events)].tolist()

ddf_sample = ddf.loc[idx]

I wrote this repartition function so that no rows with the same value for EventNr were split into different partitions:

def repartition(ddf, npartitions=10):
  max_enr = ddf.EventNr.max()
  divs_enr = np.floor(np.arange(npartitions+1)/npartitions*max_enr).astype(np.int32)
  
  divs_idx = tuple(map(lambda x: ddf.index.compute()[ddf.EventNr==x].min(), divs_enr))
  ddf_new = ddf.repartition(npartitions=npartitions)
  ddf_new.divisions = divs_idx
  return ddf_new

When I run the code, everything seems fine until I try to run ddf_sample.compute(), which results in a KeyError taht says that a bunch of indexes don’t exist and cannot be retrieved from the dataframe.

This didn’t sound right to me, so I tried inverting the order and tryingddf.compute().loc[idx] instead. To my surprise, this didn’t raise any exceptions and worked as intended, so I’m left wondering if this is some kind of bug or I am doing something wrong.

I’m using Dask to avoid loading into memory the whole dataframe, so doing ddf.compute().loc[idx] seems counterproductive to me. I would really appreciate any help I can get : ).

Hi @magnarex, welcome to Dask community!

It’s a bit hard to help without a complete reproducible example, but here are a few thoughts:

  • I’m not sure about your repartition function, you are not using divisions kwarg, but just modifying attribute after the repartitionning.
  • What happens if you don’t repartition your data?
  • You are computing entire columns (EventNr and index) to select your sample, which is also a bit suboptimal and might lead to memory problem on a big dataset.

Label based indexing in a distributed way is always more complex than with just Pandas, is your Dataframe correctly indexed with known divisions? See Indexing into Dask DataFrames — Dask documentation.

Hi @guillaumeeb! I was trying to fix an issue with repartition that arose when I was trying to select the rows from the data. I have the data stored in parquet files, and upon reading, the divisions in the resulting DataFrame were always (None, None, ...), so I was trying to fix this issue by setting up new divisions with that function.

I’ve made some code to generate some fake data so that the issue is reproducible:

nevents = 100
max_hits = 100

eventnr = []
for event in range(1,nevents+1):
    eventnr += [event]*np.random.randint(1,max_hits)

ddf = dd.DataFrame.from_dict({'EventNr' : eventnr}, npartitions=1)
ddf.to_parquet('test.parquet')
print(ddf.divisions)
ddf = dd.read_parquet('test.parquet')
print(ddf.divisions)

This results in:

(0, 5486) # The last number is random
(None, None)

With this code to replicate the dataframe, you should now be able to reproduce the error.

However, after your comment I revisited the repartition function and use the divisions kwarg as you suggested (I didn’t know that argument existed, so thank you for that (: ) and arrived to a new error “ValueError: left side of old and new divisions are different”. After digging up a little bit, I found out that this error was happening because the divisions were (None, None, ...), so I tried to fix this and came up with a solution that seems to be working.

If I add this line after reading the parquet file:

ddf = ddf.reset_index().set_index('index',sorted=True)

The divisions property now is not None, but the values that one should expect. Now, upon using the updated repartition function, it delivers no error:

def repartition(ddf, npartitions = 10):
    max_enr = ddf.EventNr.max()
    divs_enr = np.floor(np.arange(npartitions+1)/npartitions*max_enr).astype(np.int32)

    divs_idx = list(map(lambda x: ddf.index.compute()[ddf.EventNr==x].min(), divs_enr))
    divs_idx[0] = ddf.divisions[0]
    divs_idx[-1] = ddf.divisions[-1]
    ddf_new = ddf.repartition(divisions=tuple(divs_idx))

    return ddf_new

And the random selection now works as expected, so it was a problem with the reading of the parquet files all along (:

PD: I did some improvements in the sample selection and I think this is better now:

events = da.random.choice(ddf.EventNr.unique().to_dask_array(lengths=True), 10)
idx = ddf.EventNr.isin(events)
ddf_sample = ddf.loc[idx]

Although the isin step is a bit slow, if I’m not mistaken it shouldn’t load the whole thing into memory now (:

Edit: I’ve decided to finally use np.isin instead of the built-in Series version since it provides a faster computation (:

It’s a bit of a disappointment you have to reset the index after reading the tables, ideally this should not be necessary, but I’m glad you found a solution.

Did you try to use index or calculate_divisions kwarg of read_parquet function?

This should be okay since you first reduce the dataset size with unique.

Thank you for your suggestion, I hadn’t tried that! I had to do some digging, since the index column read from the parquet isn’t named “index”, but I printed the metadata from the parquet file and found that the index was named “__null_dask_index__”. Now using:

ddf = dd.read_parquet(
    path,
    engine='pyarrow',
    index="__null_dask_index__",
    calculate_divisions=True
)

seems to fix the problem without resourcing to using reindexing, thank you! (:

1 Like