Inconsistencies with Dask Columns & Indices

Hello there, I’m quite new with Dask, so I may be asking a question which is pretty trivial or I may simply be doing things horribly wrong.
So, I am working with a Dask DataFrame which has, as columns, ORBIT, chamber, plus a handful more.
Now, what I need to do is to first groupby the Dask DataFrame by the columns ORBIT and chamber, then apply a function to the groups. At this point, I have to take the resulting Dask DataFrame, group it by ORBIT once again, and apply a second funciton. In short, this is what I am doing:

results_intermediate = ddf.groupby(['ORBIT', 'chamber'])\
                          .apply(func1, meta = meta 1)\
                          .reset_index()

results_final = results_intermediate.groupby(['ORBIT'])\
                                    .apply(func2, axis = 1, meta = meta2)

And I get the following error:

KeyError: 'ORBIT'

suggesting that ORBIT is, indeed, not a column of results_intermediate.
What happens, apparently, is that when I do reset_index(), the columns ORBIT, chamber and a new one that is defined while performing the whole operation (for me it’s named level_2) are all collapsed into a single column named index. This odd behavior is completely removed when working with the Pandas DataFrame: whenever I try to use .head() or .compute() the columns contained into index are now separate into ORBIT, chamber, and level_2.
Since I have to access the column ORBIT from the Dask DataFrame (for the computation of results_final) I was wondering how I can solve this issue.

Thank you very much in advance!

Hi @L_Martin, welcome to Dask community!

Okay, it took me some times, but I think I figured the issue: problems probably comes from meta argument construction. I built a small example where it seems meta was correctly inferred, but whenever I tried to set it to give a real name to the output column, everything failed.

This github issue gave me the trick.

import dask
import pandas as pd
df = dask.datasets.timeseries()

def func1(partition):
    return partition.x.sum()

def func2(partition):
    return partition.Sum.mean()

meta = pd.Series(name='Sum', dtype='f8', index=pd.MultiIndex([[], []], [[], []], names=['name', 'id']))
results_intermediate = df.groupby(['name', 'id'])\
                         .apply(func1, meta=meta)\
                         .reset_index()

results_final = results_intermediate.reset_index().groupby(['name'])\
                                    .apply(func2)

results_final.compute()

I’m not really certain meta should be that complicated to use here… Maybe you should open an issue with this reproducer.

Thank you very much @guillaumeeb! Yeah, the behavior of meta seems to be very odd in this case, I tried some other combinations and it still didn’t work, but this time the error that was returned was way more addressed towards the meta argument indeed. I will make sure to add more details about these different errors I got.
Anyway, thank you again!

1 Like

Hi @L_Martin, did you end up opening an issue or not yet? Do you intend to or should I do it? I think this at least needs some clarification in the documentation.

To be fair, I haven’t opened it yet. I am quite new to these things, so I am not sure how to properly do it, but I wouldn’t mind trying it out myself

No problem, do not hesitate to try and no hurry. You can ping me if you want. When opening the github issue tracker at GitHub · Where software is built, you’ll be able to create a new issue of Bug Report type, and you’ll see some template. Try to fill all the part if you can, but the reproducer above is the most important point!

1 Like