ValueError: If using all scalar values, you must pass an index error message during aggregation of a Dask Dataframe using custom functions

I am always getting a Value Error whenever I run the below Python code to do aggregation of a Dask Dataframe using custom functions

Can someone please help point out what I could be doing wrong here?

Thank you

import dask.dataframe as dd
import pandas as pd

# Sample DataFrame
df = dd.from_pandas(pd.DataFrame({
    'group': ['A', 'A', 'B', 'B', 'B', 'C', 'C', 'C', 'C'],
    'value': [1, 'two', 2, 'three', 3, 'one', 2, 3, 'four']
}), npartitions=2)

# Custom aggregation class
class CountCondition:
    def __init__(self, condition):
        self.condition = condition

    def chunk(self, s):
        return s.apply(self.condition).sum()

    def agg(self, chunks):
        return chunks.sum()

    def finalize(self, x):
        return x

# Condition function to count integers
def condition_integers(x):
    return isinstance(x, int)

# Condition function to count strings
def condition_strings(x):
    return isinstance(x, str)

# Create instances of the custom aggregation class
count_integers = CountCondition(condition_integers)
count_strings = CountCondition(condition_strings)

# Create Aggregations
agg_integers = dd.Aggregation(
    name='count_integers',
    chunk=count_integers.chunk,
    agg=count_integers.agg,
    finalize=count_integers.finalize
)

agg_strings = dd.Aggregation(
    name='count_strings',
    chunk=count_strings.chunk,
    agg=count_strings.agg,
    finalize=count_strings.finalize
)

# Perform groupby and apply custom aggregations
result_integers = df.groupby('group').agg({'value': agg_integers})
result_strings = df.groupby('group').agg({'value': agg_strings})

# Compute the results
result_integers = result_integers.compute()
result_strings = result_strings.compute()

print(result_integers)
print(result_strings)

Hi @Damilola, welcome to Dask community!

Problem comes from this line:

As you can see in Dask documentation chunk is a function that

will be called with the grouped column of each partition. It can either return a single series or a tuple of series. The index has to be equal to the groups.

It takes a SeriesGrouBy into argument, and must return a Series, not a scalar. Here, your condition_* functions is applied on a Series, not on each row of a Series. Which makes some unwanted result.

I was able to make it work like this (disclaimer: there is probably some better way):

# Custom aggregation class
class CountCondition:
    def __init__(self, condition):
        self.condition = condition

    def chunk(self, s):
        return s.apply(self.condition)

    def agg(self, chunks):
        return chunks.sum()

# Condition function to count integers
def condition_integers(x):
    return isinstance(x, int)

# Condition function to count strings
def condition_strings(x):
    return isinstance(x, str)
    
# Condition function to count integeres in a Serie
def condition_integers_series(s):
    return s.apply(condition_integers).sum()

# Condition function to count strings in a Serie
def condition_strings_series(s):
    return s.apply(condition_strings).sum()

There are still typing problems on the initial build of the DataFrame (at least on my Dask Version), if you run into this, you can disable the automatic conversion to PyArrow strings:

from dask import config
config.set({"dataframe.convert-string": False})

Hi @guillaumeeb,

Thank you so much for the response.

Unfortunately, I am still unable to get it working with the changes that you have suggested.

I am still getting the

ValueError: If using all scalar values, you must pass an index

Can you please show the full Python code as well as the display output that you got when it worked for you?

This is the full Python code of what I have tried.

import dask.dataframe as dd
import pandas as pd

# Sample DataFrame
df = dd.from_pandas(pd.DataFrame({
    'group': ['A', 'A', 'B', 'B', 'B', 'C', 'C', 'C', 'C'],
    'value': [1, 'two', 2, 'three', 3, 'one', 2, 3, 'four']
}), npartitions=2)

# Custom aggregation class
class CountCondition:
    def __init__(self, condition):
        self.condition = condition

    def chunk(self, s):
        return s.apply(self.condition).sum()

    def agg(self, chunks):
        return chunks.sum()

    def finalize(self, x):
        return x

# Condition function to count integers
def condition_integers(x):
    return isinstance(x, int)

# Condition function to count strings
def condition_strings(x):
    return isinstance(x, str)

# Condition function to count integeres in a Serie
def condition_integers_series(s):
    return s.apply(condition_integers).sum()

# Condition function to count strings in a Serie
def condition_strings_series(s):
    return s.apply(condition_strings).sum()

# Create instances of the custom aggregation class
count_integers = CountCondition(condition_integers_series)
count_strings = CountCondition(condition_strings_series)

# Create Aggregations
agg_integers = dd.Aggregation(
    name='count_integers',
    chunk=count_integers.chunk,
    agg=count_integers.agg,
    finalize=count_integers.finalize
)

agg_strings = dd.Aggregation(
    name='count_strings',
    chunk=count_strings.chunk,
    agg=count_strings.agg,
    finalize=count_strings.finalize
)

# Perform groupby and apply custom aggregations
result_integers = df.groupby('group').agg({'value': agg_integers})
result_strings = df.groupby('group').agg({'value': agg_strings})

# Compute the results
result_integers = result_integers.compute()
result_strings = result_strings.compute()

print(result_integers)
print(result_strings)


# Print the dask and pandas versions
import dask
import pandas as pd

print("Dask Version:", dask.__version__)
print("Pandas Version:", pd.__version__)

Dask Version: 2024.10.0
Pandas Version: 2.2.2

Hi, I think the problem is in the chunk function of the aggreagation class. Here is my full code:

import dask.dataframe as dd
import pandas as pd
from dask import config
config.set({"dataframe.convert-string": False})

df = dd.from_pandas(pd.DataFrame({
    'group': ['A', 'A', 'B', 'B', 'B', 'C', 'C', 'C', 'C'],
    'value': [1, 'two', 2, 'three', 3, 'one', 2, 3, 'four']
}), npartitions=2)

# Custom aggregation class
class CountCondition:
    def __init__(self, condition):
        self.condition = condition

    def chunk(self, s):
        return s.apply(self.condition)

    def agg(self, chunks):
        return chunks.sum()

# Condition function to count integers
def condition_integers(x):
    return isinstance(x, int)

# Condition function to count strings
def condition_strings(x):
    return isinstance(x, str)
    
# Condition function to count integeres in a Serie
def condition_integers_series(s):
    return s.apply(condition_integers).sum()

# Condition function to count strings in a Serie
def condition_strings_series(s):
    return s.apply(condition_strings).sum()

# Create instances of the custom aggregation class
count_integers = CountCondition(condition_integers_series)
count_strings = CountCondition(condition_strings_series)

# Create Aggregations
agg_integers = dd.Aggregation(
    name='count_integers',
    chunk=count_integers.chunk,
    agg=count_integers.agg
)

agg_strings = dd.Aggregation(
    name='count_strings',
    chunk=count_strings.chunk,
    agg=count_strings.agg
)

# Perform groupby and apply custom aggregations
result_integers = df.groupby('group').agg({'value': agg_integers})
result_strings = df.groupby('group').agg({'value': agg_strings})

# Compute the results
result_integers = result_integers.compute()
result_strings = result_strings.compute()