Dask group_by and getting the unique column count is taking a lot of time

Hi,
I have a dataframe with following columns - {A, B, C, D, E, F}.
I am trying to groupby on A,B columns and find the number of unique values in C for this group.

Following is the code :

grouped = ddf.groupby(['A', 'B']).C.nunique().to_frame('count').compute()

At the end I want these columns in the grouped dataframe - {A, B,C, D, E, F, count}

This code has been running forever. How can I optimize logic ?

Hi @blackcupcat,

Do you have a way to reproduce your example code at a small scale? Could you generate some data?

Did you use the Dask Dashboard to have some insights of what was happening?

The line of code looks good to me, but be careful with the computation, do you have enough memory to get the resulting Dataframe?

Hi,
I have been checking the progress on the data.
I get a IO timeout after progressing 4%

[# ] | 4% Completed | 78m 2ssss

IOStream.flush timed out

Additional details :

  • I am running this code on a sagemaker notebook on a p3.2xlarge machine
  • I have 500GB of volume attached.
  • The current dataframe has around 25 million rows partitioned across 10.
  • Please note I have another queries that I run on this data and these work fine and complete in around an hour:
ddf.groupby('B').apply(lambda x: x.sort_values('D'), meta=ddf._meta).compute().reset_index()

Do you have a more complete stacktrace? Does it comes from the client? Do you have some Worker logs too?

Are you able to open the Dask Dashboard and do you see task being computed?

Hi,

the most likely culprit is that nunique reduces your DataFrame to one partition, before counting the unique rows, this might blow up your memory if you are unlucky, you can avoid this through setting split_out=10 to keep the original number of partitions. We are in the process of changing the default for this to become more reliable