How to get groupby group names with Dask.Dataframes

I am looking to either iterate over groups to get names of each group or to have an accessor to the group name in a groupby.apply().

First goal would be an answer like this: python - Using groupby group names in function - Stack Overflow

# an equivalent to this in dask would be nice
df.groupby('class').apply(lambda x: x.name)

or

for nm, grp in fg.groupby('class'):
    fn(grp, nm)

I can see how this might not be as straight-forward since the group name is not known until a potential repartition or shuffle.

If that is not possible, I could settle for something like

from itertools import product

results = []
for class in df['class'].unique():
    s = df[df.class == class]
    fn(s) # should this be delayed or can I persist each subset and operate on it later?

Thanks for any help!

Hi @ljstrnadiii, thanks for the question! Using an example similar to the Stack Overflow question you linked, this works for me:

import pandas as pd
import dask.dataframe as dd

n = 20
df = pd.DataFrame({'user': ['a', 'b', 'c'] * n,
                   'value1': [1, 2, 1] * n,
                   'value2': [20, 10, 20] * n})
ddf = dd.from_pandas(df, 4)
ddf.groupby('user').apply(lambda x: x.name, meta=('user', str)).compute()

I’m still looking into whether this will work for a broader set of cases, though. If you’re able to share an example of what didn’t work for you, and what the failure was, that would be great.

2 Likes

After looking into this a bit more @ljstrnadiii, it seems the .name() accessor will only work if the number of partitions is the same as the number of groups and your dataframe is sorted by the groupby column. You can use sort_values to ensure this is the case:

import pandas as pd
import dask.dataframe as dd

df = pd.DataFrame({'x': [1, 2, 3] * 10, 'y': range(30)})
ddf = dd.from_pandas(df, npartitions=4)
ddf = ddf.sort_values('x', npartitions=ddf['x'].nunique().compute())
ddf.groupby('x').apply(lambda x: x.name, meta=('x', 'int64')).compute()
1 Like

Thanks for looking into this!

Here is a way to reproduce:

from dask.distributed import Client
import numpy as np
import dask.dataframe as dd
import pandas as pd

def build_dd(n):
    cats = np.random.choice(['a','b','c'], n)
    years = np.random.choice([2001,2002,2003], n)
    data = np.random.random(n)
    df = pd.DataFrame({'features': cats, 'time': years, 'data': data})
    ddf = dd.from_pandas(df, npartitions=1)
    return df

client = Client(some_kubernetes_helm_based_cluster)

futures = client.map(build_dd, np.random.randint(1000, 10000, 100))
results = client.gather(futures)

df = dd.concat(results)
grpby = df.groupby(['features','time'])
grpby.apply(lambda x: x.name, meta={'time': str, 'ftr': str}).compute()

I think that I was being lazy with the npartitions. I guess I need some best practice advice around the op I am trying to do here. Should I sort by multiple values as you suggested on the concat object? I seem to get a lot of tasks and I am guessing there is a lot of reshuffling going on. Should I also do a sort_values with npartitions unique in my build dd function to maybe keep task count low before the concat to help the groupby? Will try this now anyways.

@scharlottej13 , Thanks for all the help! And sorry for the delay–I got distracted with other stuff.

1 Like

Also, is there a general rule of thumb for the choice of 4 here?

Reporting back with a NotImplementedError when sorting by multiple keys. Any known work arounds?

Thanks for providing more details @ljstrnadiii! I’m still working on the best way to do this, but will get back to you!

Reporting back with a NotImplementedError when sorting by multiple keys. Any known work arounds?

Indeed, sort_values doesn’t support multiple keys, so it seems my original suggestion won’t quite work for this use case.

Also, is there a general rule of thumb for the choice of 4 here?

The choice of 4 was mostly arbitrary :slight_smile:

1 Like

After trying a few more things, I think the best solution would be to submit a feature request to implement Pandas groups in Dask. Then, you could call groups.keys() on the Dask dataframe groupby object to get the values you’re looking for.

Another option for a workaround might be to use the index instead, which may be close to what you’re looking for?

import pandas as pd
import dask.dataframe as dd

df = pd.DataFrame(
    {'x': [1, 2, 3] * 10, 'y': range(30), 'z': ['a','b','c'] * 10})
ddf = dd.from_pandas(df, npartitions=2)
ddf.groupby(
    ['x', 'z']
).apply(
    lambda x: x.index.values,
    meta=(df.groupby(['x', 'z']).apply(lambda x: x.index.values))
).reset_index().compute()
2 Likes