How to parallelize a loop that applies the same model to different data

I am learning dask and facing a challenge. The following code is a minimum replication code. Please kindly advise how to write a compact code to parallelize a loop that applies the same model class to different size of data.

import dask
import pandas as pd
import numpy as np
from sklearn.linear_model import LinearRegression
from sklearn.datasets import make_regression

samples=[make_regression(n_samples=100,n_features=nx,noise=10,random_state=10) for nx in range(1,30)]

# This is a simple OLS here, but I want to define a big model somewhere else
model = LinearRegression()

def reg(y,X,f):
    y = pd.DataFrame(y)
    X = pd.DataFrame(X)
    y_pred = f.fit(y=y.iloc[:-1,:],X=X.iloc[:-1,:]).predict(X=X.iloc[-1:,:])
    return y_pred

# this gives error saying ValueError: X has 9 features, but LinearRegression is expecting 20 features as input. Sometimes 9 becomes 6, 13, 17 etc, a bit random...
results = dask.compute(*[dask.delayed(reg)(X=X,y=y,f=model) for (X,y) in samples])

# This works, which is puzzling since I thought dask just parallelly process the same thing...
results = [reg(y=y,X=X,f=model) for (X,y) in samples]

# this works, so the error seems to come from initiating a class? 
results = dask.compute(*[dask.delayed(reg)(X=X,y=y,f=LinearRegression()) for (X,y) in samples])
# This is inconvenient since I need to write a big model inline. How can I use "model" instead of "LinearRegression()" to write the code?

Hi @sakaiando, welcome to Dask Discourse forum!

A solution would be to use a scheduler based on multiprocess, so either creating a LocalCluster object and associated Client, or use this syntax:

results = dask.compute(*[dask.delayed(reg)(X=X,y=y,f=model) for (X,y) in samples], scheduler="multiprocessing")

Distributed scheduler and multiprocessing are not the same. Also keep in mind this involved starting a process for every Worker, and serialization between processes. This means some overhead, especially visible in this simple example.

I’m not really sure why this fail using threads, but I think that it means sharing the same object with concurrent calls to fit, and so probably race conditions.

Thanks @guillaumeeb! This is very useful to know. I see scheduler=‘processes’ also works.

1 Like