How groubyied Dataframe works?

Hi, How partitioning in the groupby works in the Dask?
Let’s say that I have pretty huge dataframe, and I want to use statsmodels

Problem with statsmodels is that, it supports only Pandas dataframe. As I understand If I convert Dask DF to Pandas DF, it means that from cluster perspective all data from the Nodes is gathered into one point (main node)…

So, my idea is that I will apply the groupby function (or other function) on Dask DF and the Grouped DF will be converted to Pandas DF and then used in statsmodels?

The question is, can someone elaborate how the DF will behave in that case? Maybe someone has a better idea?

Hi @Richard, welcome here!

True! To be really precise, it is gathered on the node where the you launched the computation (where the Client is if using Distributed).

What you are looking for is this:

Feel free to ask if you have more question or if I did not understand correctly.

Hi @guillaumeeb

Thank you very much for reply.

From the example:

def group_test(data_ddf):
    pandas_df = data_ddf.compute() # Where it is executed?
   # other operations

df.groupby('col').apply(group_test, meta=object).compute()

The data for the pandas_df are also collected on the main node?
As I understand Dask has row based partitioning, so If yes (collected on main node), why it is happening? Why cannot be in the collect on other (worker) nodes?

Or is there any comparison between Dask groupby and Spark UDF?

I mean I am looking for similar answer but for Dask :slight_smile: pyspark - Does the User Defined Functions (UDF) in SPARK works in a distributed way? - Stack Overflow

I think your example should look like that:

def group_test(panda_df):
    # The input to the function is a Pandas dataframe, because it's only one partition of the Dask Dataframe
    # other operations

df.groupby('col').apply(group_test, meta=object).compute()

So no the group_test function will be applied on Worker nodes, each group_test task will take a Pandas Dataframe into input, this Pandas DataFrame being one partition of the Dask DataFrame.

So it’s not collected, the partition is just loaded onto a Worker node for processing.

So yes, this operation will work on a distributed way!

However, your last line with apply().compute() will gather all the result in the main node. You might prefer to just write the results on a file system or object store from the workers, using options described here:

Thank you very much :slight_smile:

1 Like