Performance of Dask DataFrames for Feature Engineering

I’m evaluating Dask for some feature engineering tasks on a single node (not a cluster) with 4 CPUs and 16GB RAM. My datasets are 1GB and 3GB in size.
Unfortunately, I see Pandas and Polars “outperforming” Dask significantly.

My question: Are there any best practices to “optimize” Dask for things like group-by, binning or missing value imputation? I want to ensure I didn’t miss anything in my setup….

Example:

from dask.distributed import LocalCluster
cluster = LocalCluster()
import dask.dataframe as dd
transactions = dd.read_csv('transactions.csv', parse_dates = ['order_date', 'purchase_date', 'shipping_date', 'estimated_delivery_date', 'delivery_date', 'invoice_date'], dtype={'order_id':'Int32', 'product_id':'Int32', 'customer_id':'Int32', 'payment_type': 'category', 'order_status': 'category','invoice_no':'Int32'})
transactions['delivery_date_vs_estimation'] = (transactions.delivery_date - transactions.estimated_delivery_date)
transactions.groupby('product_id').delivery_date_vs_estimation.mean().compute()

The datatype of product_id is Int32 and delivery_date contains values of the type datetime64[ns].

This takes approx. 55 seconds in Dask, 5 seconds in Polars, and less than 1 second in Pandas. I verified that the LocalCluster uses all cores of my setup. I also changed the blocksize for the CSV, but that didn’t yield any result.

Any suggestions appreciated!

Hi @Larissa, welcome to the forum.

Well, if your data fits in RAM, which seems to be the case, this can totally be true depending on your workflow.

Aside the fact that I don’t see a Client object created in your code, your code looks good on the DataFrame part side (well you could read less columns for this example, but I guess you know that). Do you use exactly the same code when comparing to Pandas?

I’m still surprise of the huge different. Does the Pandas time includes loading the data in memory?

To better help you, we could need a full reproducible example, could you generate fake data to play with?

1 Like

Hi @guillaumeeb,

Thank you for your answer!

I used the same code for Pandas. The Pandas time does not include loading the data into memory. Taking into consideration the time that it took for Pandas to load the data in memory, Dask was faster in this experiment. Still, when comparing one-hot encoding for instance, Dask was slower compared to Pandas when you take the time for reading the csv into account.

Surprisingly, in my experiment CPU Time/Walltime was 1.1 although 4 CPUs were provided…

I can generate data if that helps you :smiley:

Okay, this does sound more understandable.

That would be very nice to allow us to better understand where the limitation might be. But again, in a case where data fits in memory, it is not necessarily surprising that Pandas alone performs better.

Hi @guillaumeeb,

thank you :smiley: . How can I send you the generated dataset?

The best thing to do will be to provide the code to generate the data. If this is not possible, then try to share it on some public website like Google Drive or alike.

here is the code to generate a 1GB Transactions-csv-File

import numpy as np 
import datetime as dt
import random
import csv

def generateDatesAndStatus():
    ORDER = (dt.datetime(random.randint(2019, 2022), random.randint(1, 12), random.randint(1, 28)) + dt.timedelta(days=random.randint(1,4)) + dt.timedelta(hours=random.randint(0,23)) 
    + dt.timedelta(seconds=random.randint(0,59)))
    deltaOrderShipping = dt.timedelta(minutes=random.randint(50,200))
    if random.randint(1,2)%2==0:
         PURCHASE = ORDER
    else: 
         PURCHASE = random.choice([(ORDER + dt.timedelta(seconds=random.randint(1,200))), (ORDER + dt.timedelta(seconds=random.randint(1,200))), ""])
    SHIPPING = random.choice([(ORDER + deltaOrderShipping), (ORDER + deltaOrderShipping), ""])
    if PURCHASE == "":
        INVOICE = ORDER + dt.timedelta(seconds=random.randint(1,200))
    else:
        INVOICE = random.choice([PURCHASE + dt.timedelta(seconds=random.randint(12,120)), ""])
    if SHIPPING == "":
        DELIVERY = ""
        ESTIMATEDDELIVERY = ORDER + dt.timedelta(hours=random.randint(20,300))
    else: 
        DELIVERY = random.choice([(ORDER + dt.timedelta(hours=random.randint(20,300)))])
        ESTIMATEDDELIVERY = DELIVERY + dt.timedelta(hours=random.randint(-12,30))
    if SHIPPING == "":
        if DELIVERY == "":
            STATUS = "out of stock"
        else:
            STATUS = "lost"
    else: 
        STATUS = random.choice(["refund", "payment not transferred", "success", "success", "success", "success", "success"])
    return SHIPPING, ORDER, PURCHASE, ESTIMATEDDELIVERY, DELIVERY, STATUS, INVOICE


def createTransactionsData(length, productsLength, customersLength):
    header=['order_id', 'product_id', 'customer_id', 'payment_type', 'shipping_date', 'order_date', 'purchase_date', 
            'estimated_delivery_date', 'delivery_date', 'order_status', 'invoice_no', 'invoice_date'] 
    orderIds = np.arange(0, length)
    productIds = np.arange(0, productsLength)
    customerIds = np.arange(0, customersLength)
    invoice_no = np.arange(0, length)
    paymentTypes = ["credit", "paypal", "klarna", "direct credit", ""]
    size = range(0,length)
    with open('transactions.csv', 'w', encoding='UTF8') as f:
        writer = csv.writer(f)
        writer.writerow(header)
        for element in size:
            DATES = generateDatesAndStatus()
            PRICEBOUGHT = round(random.uniform(0.50, 600.00), 2)
            writer.writerow([orderIds[element], random.choice(productIds), random.choice(customerIds), random.choice(paymentTypes), DATES[0], DATES[1], DATES[2], DATES[3],
                            DATES[4], DATES[5], invoice_no[element], DATES[6]])
        f.close()
        
createTransactionsData(7355712, 12088310, 28252490) 

for 3 GB please use

createTransactionsData(22067139, 36264934, 86199232)

I just tried your example. Here are the results I got:

  • Pandas: 61s
  • Dask Dataframe with default threaded Scheduler: 57s
  • Dask Dataframe with Distributed (LocalCluster of 4 workers): 30s

I noticed one thing in your first post: you don’t create a Client object, if this is the real code you executed, then you must have used threaded Scheduler.

You should do something like:

from dask.distributed import LocalCluster, Client
cluster = LocalCluster()
client = Client(cluster)

Next thing is, as noted above, your code is mainly IO bound: 1 second for the computation in Pandas, but 60s to load the data… Threaded scheduler doesn’t seem to improve data loading time, not sure why. In distributed mode, independent processes load data concurrently, and they are able to saturate my SSD bandwith. By looking at the Dask Dashboard, I can see than it takes about 20-25s to load all the data, and then there is a Reduction operation between workers for the group_by.

To me all this looks good, Dask accelerate a bit, but on a Laptop with a single SSD and an IO bound workflow, it can’t divide the time taken by 4.

1 Like

Reading CSV data with pandas runs at about 50MB/s. Doing a groupby aggregation runs at about 1GB/s. You’re likely to be entirely bound by CSV reading. This computation is GIL-bound so processes are recommended.

If you wanted to optimize for speed though the biggest improvement you could make wouldn’t be about the library to use (pandas/dask/polars/spark/…) it would be about the file format. I recommend taking a look at Parquet instead of CSV.

1 Like

thank you, I only had

cluster = LocalCluster()

in my code