Custom aggregation of dask dataframe

I am using the dask dataframe’s df.groupby().agg() with a custom aggregation function (a weighted sum) defined by dd.Aggregation.

data = {'PostCode': ['A', 'A', 'B', 'B'], 'EmployerSize': [100, 200, 300, 400], 'DiffMeanHourlyPercent': [0.1, 0.2, 0.3, 0.4]}

df = pd.DataFrame(data)
ddf = dd.from_pandas(df, 2)

def chunk(chunk):
    def weighted_func(df):
        return (df["EmployerSize"] * df["DiffMeanHourlyPercent"]).sum()
    return (chunk.apply(weighted_func), chunk.sum()["EmployerSize"])

def agg(total, weights):
    return (total.sum(), weights.sum())

def finalize(total, weights):
    return total / weights

extent = dd.Aggregation('extent', chunk, agg, finalize=finalize)
ddf.groupby("PostCode")['EmployerSize', 'DiffMeanHourlyPercent'].agg(extent).compute()

I am using Dask 2024.3.1 and pandas 2.2.1.
Should the input of the chunk method be a DataFrameGroupby? But now it is a SeriesGroupby. So chunk.sum()["EmployerSize"] throw exceptions.

I actually get this code from a open-source book.

I guess both Dask and pandas are updated, so previous code is not working.

Any ideas on how to handle this situation?

Hi @luweizheng,

Thanks for sharing this problem with a proper reproducer. I can confirm that it worked (with some FutureWarning) with previous version (2024.02.0), but not since 2024.03.0. I also tried to disable the new query planning feature ( dask.config.set({"dataframe.convert-string": False})), but encountered another Error.

I’m not sure of what changed, and if this is expected, so cc @crusaderky @hendrikmakait.

I’ve tried with packages as old as dask==2023.1.0 and pandas==1.4 and from what I’m seeing agg has always worked independently on each column.
This is consistent with the same in pandas. I can’t see anything in the documentation of either pandas or dask that suggests that it should be capable of performing multi-column reduction?

What you want to implement doesn’t need complicated map-reduce paradigms either. You can achieve it in a much simpler way with

import pandas as pd
import dask.dataframe as dd

data = {'PostCode': ['A', 'A', 'B', 'B'], 'EmployerSize': [100, 200, 300, 400], 'DiffMeanHourlyPercent': [0.1, 0.2, 0.3, 0.4]}

df = pd.DataFrame(data)
ddf = dd.from_pandas(df, 2)

ddf["Weighted"] = ddf["EmployerSize"] * ddf["DiffMeanHourlyPercent"]
ddf = ddf.groupby("PostCode").sum()
ddf = ddf["Weighted"] / ddf["EmployerSize"]
ddf.compute()

output:

PostCode
A    0.166667
B    0.357143
dtype: float64

Thanks for your reply!

I just wanted to confirm how to perform custom aggregation within Dask.

In pandas, when using df.groupby("PostCode"), we get a DataFrameGroupBy object. We can select certain columns of the DataFrameGroupBy, for example, by writing: df.groupby("PostCode")['EmployerSize', 'DiffMeanHourlyPercent']. Of course, after pandas 2, this would raise an error, as pandas suggests using double square brackets. So, I’m guessing that with a Dask DataFrame, when we use ddf.groupby("PostCode"), it executes the groupby on each partition’s pandas DataFrame, resulting in a DataFrameGroupBy object of that partition. The agg function is then used to aggregate across multiple partitions’ DataFrameGroupBy objects.

Also, as @guillaumeeb said, Dask version 2024.2.0 works correctly for this.

I’m not sure if my understanding is right:

The chunk function is applied to each partition of the Dask dataframe in parallel. Its result is then combined with the results from other partitions using the agg function. Finally, the finalize function is applied to the result of the aggregation.

In pandas, when using df.groupby("PostCode") , we get a DataFrameGroupBy object.

Yes, and it’s the same in dask.

The agg function is then used to aggregate across multiple partitions’ DataFrameGroupBy objects.

Yes and, just like in pandas, the various functions are applied individually to each column.

The chunk function is applied to each partition of the Dask dataframe in parallel. Its result is then combined with the results from other partitions using the agg function. Finally, the finalize function is applied to the result of the aggregation.

Correct. What may not be obvious is that there can be multiple recursive applications of agg, depending on your split_every configuration.
As split_every=8 by default, if your GroupBy group is made of 500 partitions, you’ll have

  1. 500 chunk calls, one for each partition
  2. ceil(500/8) = 63 agg calls, one every 8 outputs of step 1
  3. ceil(63/8) = 8 agg calls, one every 8 outputs of step 2
  4. ceil(8/8) = 1 agg call, one every 8 outputs of step 3
  5. a single finalize call, on the output of step 4

If you have more than 1 column, each chunk, agg, and finalize call is repeated individually on each column.
The above is repeated separately on each group.

1 Like

Thanks a lot!

Honestly, some changes in Dask 2024.3 may have broken existing APIs. In addition to this issue, I’ve also discovered many other problems, such as those related to groupby, which are very commonly used pandas APIs. It seems that these issues are mainly related to optimizations within dask-expr.

Please open tickets for any regressions you may find at Issues · dask/dask · GitHub