dataframe.groupby.Aggregation has dataframe populated with foo or 1

Hi - I have a use case for dataframe.groupby.Aggregation where I’m trying concatenate a column that contains a list of tuples. I’ve provided an example of what I’m trying to do below. When I run the code I noticed that the very first pass of the chunk/agg functions takes a dataframe populated by “foo” and 1 as an input. I put in some conditions to get around this issue and get the aggregation working correctly. I was wondering if there’s a more efficient way to get around this? Or is the Aggregation method not appropriate for my use case?

In this example the code runs fine with the conditions in chunk/agg, but in reality I’m dealing with much larger lists and more complicated transformations than simply concatenating them.

from itertools import chain
from distributed import Client
import dask.dataframe as dd
import pandas as pd

# client = Client()

def f_chunk(x):
	# breakpoint()
	if x.head().tolist() == ["foo", "foo"]:
		x = x.head(1)
	else:
		x = x.apply(lambda s: list(chain(*s.tolist())))
	return x

def f_agg(x):
	# breakpoint()
	if x.head(1).tolist() == ["foo"]:
		x = x.apply(lambda s: list(chain(*s.tolist())))
	else:
		x = x.apply(lambda s: list(chain(*s.tolist())))
	return x

custom_agg = dd.Aggregation("custom_agg", f_chunk, f_agg)

pd_data = pd.DataFrame(
    {
        "A": [1, 1, 1, 2, 2, 2],
        "B": [
            [(12, 1), (3, 1), (4, 1)],
            [(3, 4), (1, 1)],
            [(1, 1), (5, 3)],
            [(3, 1), (2, 1)],
            [(12, 3), (2, 1)],
            [(3, 1), (5, 5), (6, 1)],
        ],
    }
)
data = dd.from_pandas(pd_data, npartitions=4)
data_grouped = data.groupby(["A"]).agg(D=pd.NamedAgg("B", custom_agg))
result = data_grouped.compute(scheduler="single-threaded")
result = result.reset_index()
# breakpoint()