Why does dask take long time to compute regardless of the size of dataframe and partitions

What is the reason that dask dataframe takes long time to compute regardless of the size of dataframe and number of partitions. How to avoid this from happening ? What is the reason behind it?

Problem -

I’m currently working on AWS Sagemaker with ml.c5.2xlarge instance type and the data is in S3 bucket. I did not connect to client as I was not able to. I’m getting this error when I ran the client through local cluster → AttributeError: MaterializedLayer’ object has no attribute ‘pack_annotations’

So, I proceeded without connecting with anything specific, there by it is now on Default. (Cluster, Workers: 4, Cores: 8,Memory: 16.22 GB )

shape = df.shape
nrows = shape[0].compute()
print("nrows",nrows)
print(df.npartitions) 

I tried to perform compute on 24700000 records (~27M), with 23 partitions and the time taken to execute is CPU times: user 4min 48s, sys: 12.9 s, total: 5min 1s Wall time: 4min 46s

For nrows 5120000 (~5M), with 23 partitions, and the time taken to execute is CPU times: user 4min 50s, sys: 12 s, total: 5min 2s Wall time: 4min 46s

For nrows 7697351 (~7M) with 1 partition, The time taken is CPU times: user 5min 4s, sys: 10.6 s, total: 5min 14s Wall time: 4min 52s

I performed the same operations in Pandas with 7690000 (~7M) and the time take to execute is CPU times: user 502 µs, sys: 0 ns, total: 502 µs Wall time: 402 µs

I’m just trying to find the shape of the data, But in Dask regardless of the type of operation the dask is taking same time to perform one compute operation.

May I know what is the reason behind this and what do I

@jhanv Welcome to Discourse!

Just to note, I see you asked this on Stack Overflow as well.

Dask isn’t aware of the shape of your DataFrame. In fact, it just knows the number of “partitions”. So, the number of elements in each partition are calculated when you call .compute(), which does take some time. This is one of the reasons why it’s recommended to call .compute() or .persist() very minimally, towards to end.

Here’s an example task graph:

import pandas as pd
import dask.dataframe as dd
from distributed import Client

client = Client()

df = pd.DataFrame({
    'a': range(1000),
    'b': range(1000),
})
ddf = dd.from_pandas(df, npartitions=2)

ddf.shape[0].visualize()

Let me know if this helps!

1 Like

Thanks for answering. I understand that it’s recommended to call .compute() or .persist() very minimally.

So, are you saying that since it is running on CPU, dask is performing computations sequentially across all the partitions that’s why it is taking some time?

If that is not the case, it should be that dask is performing computations parallelly across all the partitions, so it it takes same amount of time for the computation to occur in one partition vs many partitions (assuming that the there is same amount data in any partition)

Whichever is the case, can I know why is dask performing poorly (even with 1 partition) compared to pandas?

Note : Please refer to the original post for details about the data

Can you give me more insight into this issue, so that I can have a clear understanding