Implementing custom lambda function in Dask

I have written a custom lambda function which needs to be applied after groupby operation in the dataframe. The lambda function concatenates all **unique **strings together in a certain group with an appropriate joiner such as ", ". I am trying to implement the same functionality in Dask library of Python. However, I am getting the error message shown below. Could anybody guide me on how to implement this lambda function in Dask?

Implemented in Pandas:

A = pd.DataFrame(data = {"A": ["saad", "saad", "saad", "saad", "nimra", "asad", "nimra", "nimra", "asad"],
                         "B": ["hello", "hello", "saad", "whatsup?", "yup", "nup", "saad", "saad", "nup"],
                         "C": ["hello", "hello", "saad", "whatsup?", "yup", "nup", "saad", "saad", "nup"]
A.groupby("A")["B"].unique().apply(', '. join)
A.groupby("A").agg(lambda s: ', '.join(s.unique()))

This code works perfectly fine and produces the correct output:
A        B                                      C
asad    nup                                    nup
nimra   yup, saad                          yup, saad
saad    hello, saad, whatsup?       hello, saad, whatsup?

Implemented in Dask
I tried to implement it in Dask using this code:

A_1 = A.copy()
A_1 = dd.from_pandas(A, npartitions=2)
A_1.groupby("A").agg(lambda s: ', '.join(s.unique()))

However, the following error occurs:
ValueError                                Traceback (most recent call last)
Cell In[20], line 1
----> 1 A_1.groupby("A").agg(lambda s: ', '.join(s.unique()))

ValueError: unknown aggregate lambda

Hi @smunir1994, welcome here!

As Dask works in distributed, and will use some Map Reduce logic to apply your aggregation, the code to write is a little bit more complex. You’ve got to use

Here is some code that works:

custom_dask_agg = dd.Aggregation(
    chunk=lambda x: x.agg(lambda s: ', '.join(s.unique())),
    agg=lambda y: y.agg(lambda s: ', '.join(s.unique()))