Perform the same operation on all columns of a dask dataframe in parallel

Hello,

TLDR: Best practice for parallelizing the same task on each column in a dask dataframe.

I have to perform the same operation on each column of a dask dataframe. I’m still new to dask, but I would have assumed this would be a perfect use case to parallelize these operations where they can happen at the same time.

I was originally using dask delayed for this until I saw this stack overflow post talking about how you shouldn’t mix delayed with other dask collections. I did notice that my dask series I was using for this was becoming a regular pandas series when I would pass it to a delayed function.

I also saw this post on the discourse but wasn’t sure how I could accomplish this if my dask dataframe was already chunked a certain way and if I could change that halfway through the process. I also wasn’t sure if this would be an efficient approach if I have say 600 columns and 20 million rows.

My overall process is trying to fit a linear model on each column and calculate an evaluation metric for that one column and store this in a dictionary {column_name: metric_value}.

  1. loop over each column in a dask dataframe
  2. fit a logistic regression (dask-ml) using that column
  3. predict on a test set and record metric for that column in a dictionary

If I can’t use dask delayed or any better option then is there a way to delay the call to .fit() with the logistic regression? It seems to perform this operation as soon as it is called instead of it being lazily evaluated.

Thank you in advance for any help you can offer!

Hi @Kaegan, I’m happy to see if I can help you out here. Are you able to provide a minimal reproducible code example that demonstrates the issue you’re running into?

From your question it seems like you might be running into two separate issues: one with transforming your columns in parallel and one with calling .fit() in parallel. Am I understanding that correctly?

Hi rrpelgrim,

Thank you so much for the response and help! You are correct about the issues but it’s more of a combination of the two. So in general I am wondering if there is a best practice to parallelize operations that need to be performed on each column individually. Specifically, the operation I want to parallelize is to fit a logistic regression on each column of a dask dataframe then score its performance, and wondering if I could do that in parallel instead of sequentially.

I could probably come up with a small reproducible example if that would help I was just hoping there might be an easy general solution.

I seriously appreciate your help on this problem!

Have you looked at dask.dataframe.DataFrame.map_partitions — Dask documentation?

Hello rrpelgrim,

I did look at map_partitions but I was assuming it would only operate on the individual row wise partitions I wasn’t sure how this would work for applying a function accross columns. In this case would it fit a logistic regression on each partition? If so I am not sure how a full linear model would result at the end.

Sorry if this is a very basic question I am completely new to dask and trying to understand the best way of doing things still.

If you are able to make it work in pandas code, then you can use that pandas code with map_partitions to apply it across all your partitions.

Below is a toy example using pandas .apply() method with the default axis=0 so the function will be applied across each column. Not sure if this will work for your more elaborate use case but it may be worth a shot:

import dask.dataframe as dd
import pandas as pd
import numpy as np

df = pd.DataFrame({"col1": [5, 6, 7, 8], "col2": [1, 2, 3, 4]})
ddf = dd.from_pandas(df, npartitions=2)

# in pandas, use apply
df.apply(lambda x: x+1)

>>> 
   col1	col2
0	6	2
1	7	3
2	8	4
3	9	5

# in dask, use map_partitions to perform apply on each partition
def my_func(df):
    return df.apply(lambda x:x+1)

ddf.map_partitions(my_func).compute()

>>> 
   col1	col2
0	6	2
1	7	3
2	8	4
3	9	5