User-defined types for groupby apply with p2p shuffling

It seems that p2p shuffling is not supported for a dask dataframe that contains columns of user-defined types. This is not a limitation for tasks shuffling though. Here’s an example:

import dask
import dask.dataframe as dd
import pandas as pd

class Foo:
    def __init__(self, x):
        self.x = x
        
def make_df():
    return pd.DataFrame([[1, Foo(1)], [2, Foo(2)], [3, Foo(3)]] , columns=['A', 'B'])

ddf = dd.from_delayed(dask.delayed(make_df)(), meta={'A': int, 'B': object})
ddf.groupby('A').apply(lambda df: len(df), meta=(None, int)).compute()

Here ddf contains a column B of type Foo, and as a result I can’t groupby and apply on ddf with p2p shuffling. I tried changing the meta for B to Foo but it still doesn’t work. Does there exist a way right now to do it?

Hi @yiliema,

Welcome to Dask community!

Just to be sure, why do you mean you can’t groupby and apply with p2p shuffling. Do you get an error with the above code?

Yes, I ran it with several recent dask versions and I got the same error saying p2p shuffling failed. And the root cause seems to be that p2p relies on pyarrow, which complains about user-defined types. What I’m not sure about is if there exists a way to get around that. If not, then it’s probably an issue or feature request.

Actually this only fails if the computation takes place on a cluster.

import dask
import dask.dataframe as dd
from dask.distributed import Client
import pandas as pd

class Foo:
    def __init__(self, x):
        self.x = x
        
def make_df():
    return pd.DataFrame([[1, Foo(1)], [2, Foo(2)], [3, Foo(3)]] , columns=['A', 'B'])

cl = Client()
ddf = dd.from_delayed(dask.delayed(make_df)(), meta={'A': int, 'B': object})
cl.gather(cl.compute(ddf.groupby('A').apply(lambda df: len(df), meta=(None, int))))

Running it locally as in my original post works fine, confusingly.

Yes, you are right, serialization only happens when using multiple processes.

You clearly well identified the cause, PyArrow doesn’t know how to serialize your user defined type. There might be some way to get around, but I didn’t find it yet, maybe someone knows better than me!

cc @fjetter @crusaderky.

Sorry for the late reply. I believe this has been fixed now. At least it works for me right now with the latest release 2023.12.1

1 Like

@fjetter Thanks for the reply. I think the example in OP always runs fine, it’s when it takes place on a cluster that it fails. I tried running with 2023.12.0 and it still fails. Sorry for the confusion.