Does dask_ml.model_selection.GridSearchCV support GPU by applying LocalCUDACluster()?

Dear Representative,

Does dask_ml.model_selection.GridSearchCV function being supported with GPU through using LocalCUDACluster()? I am trying to run sklearn.neural_network.MLPRegressor coupled with using dask_ml.model_selection.GridSearchCV to train my model, and I don’t know whether the way below is the correct way to process my task by using local GPUs? If not, what is the correct way to do? Thank you so much.

The code I am using as below:
#===#
import os
import time
import tracemalloc
import joblib
import pandas as pd
import dask.array as da
X_train=joblib.load(os.getcwd()+‘/X_train.pkl’)
X_test=joblib.load(os.getcwd()+‘/X_test.pkl’)
y_train=joblib.load(os.getcwd()+‘/y_train.pkl’)
y_test=joblib.load(os.getcwd()+‘/y_test.pkl’)
#==#
#y_train=pd.Series(y_train.iloc[:,0]).ravel()
#y_test=pd.Series(y_test.iloc[:,0]).ravel()
#==#
from dask import dataframe as dd
X_trainO = dd.from_pandas(X_train, npartitions=3)
#X_test = dd.from_pandas(X_test, npartitions=3)
#X_test = dd.from_array(X_test)
y_trainO = dd.from_pandas(pd.DataFrame(y_train), npartitions=3)
#y_test = dd.from_pandas(pd.DataFrame(y_test), npartitions=3)
#y_test = dd.from_array(y_test)
param_list=joblib.load(os.getcwd()+‘/param_list.pkl’)
njobs=3
def ST(X_train, X_test, y_train, y_test,param_list,njobs):
import os
import time
import pandas as pd
import numpy as np
import sklearn.neural_network as snn
import joblib
import dask_ml.model_selection as dcv
from dask.distributed import Client
from dask_cuda import LocalCUDACluster
cluster = LocalCUDACluster()
client = Client(cluster)
start_time = time.time()
tracemalloc.start()
estimator=snn.MLPRegressor
estimator.client = client
regressor = dcv.GridSearchCV(estimator(verbose=True),param_list,cv=5,n_jobs=njobs)
regressor.fit(X_trainO, y_trainO)
print(regressor)
current, peak = tracemalloc.get_traced_memory()
elapsed_time = time.time() - start_time
tracemalloc.stop()
joblib.dump(regressor,‘Model-Result.pkl’)
g=open(os.getcwd()+‘/Model-Performance.txt’,‘w’)
g.write(“Mean cross-validated score of the best_estimator: {:.2f}”.format(regressor.best_score_)+‘\n’)
g.write(“Test dataset R2 score: {:.2f}”.format(regressor.score(X_test, y_test))+‘\n’)
g.write(“Test dataset MSError: {:.2f}”.format(np.mean((regressor.predict(X_test) - y_test) ** 2))+‘\n’)
g.write(“Test dataset Case Number: {:.2f}”.format(len( y_test))+‘\n’)
g.write(“Train dataset Case Number: {:.2f}”.format(len( y_train))+‘\n’)
g.write(‘Model Fitting Computational Time: ‘+str(float(elapsed_time)/3600)+‘hours\n’)
g.write(“Model Fitting of memory usage is {”+str(int(current)/(106))+“}MB; Peak was {”+str(int(peak)/(106))+“}MB”+’\n’)
g.close()
df = pd.DataFrame(regressor.cv_results_,columns= [‘params’,‘rank_test_score’,‘std_fit_time’,‘mean_score_time’,‘std_score_time’,‘split0_test_score’,‘split1_test_score’,‘split2_test_score’,‘mean_test_score’,‘std_test_score’,‘param_activation’,‘param_alpha’,‘param_hidden_layer_sizes’,‘param_solver’])
df.to_csv(os.getcwd()+‘/Model-Comparison.csv’, index = False, header=True)
return
if name == ‘main’:
ST(X_train, X_test, y_train, y_test,param_list,njobs)
#=====#

Hi @MQMQ2018, thanks so much for this question! Would you be able to share a minimal reproducer, so we can better answer your question (see here and here for examples)? In the meantime, if you haven’t seen them already, you might find this RAPIDS guide and this page in the Dask docs helpful.

1 Like

Dear @scharlottej13, Thank you so much for your help and reply.

Here I modify the code that data will be automatically generated followed by minimal reproducer rule. If possible, could you help to check whether the way below is the correct way to process my task by using local GPUs? If not, what is the correct way to do? Thank you so much.

The modified code I am using as below:

#===#
import os
import time
import tracemalloc
import joblib
import pandas as pd
import dask.array as da
X_test=pd.DataFrame(np.random.rand(335,50),columns=list(['F'+str(i) for i in range(0,50)]),index=list(['R'+str(i) for i in range(0,335)]))
X_train=pd.DataFrame(np.random.rand(1336,50),columns=list(['F'+str(i) for i in range(0,50)]),index=list(['R'+str(i) for i in range(335,1671)]))
y_test=np.random.rand(335,)
y_train=np.random.rand(1336,)
param_list={'hidden_layer_sizes': [[500], [100], [50], [10], [5], [500, 100], [500, 50], [500, 10], [500, 5], [100, 50], [100, 10], [100, 5], [50, 10], [50, 5], [10, 5], [500, 100, 50], [500, 100, 10], [500, 100, 5], [500, 50, 10], [500, 50, 5], [500, 10, 5], [100, 50, 10], [100, 50, 5], [100, 10, 5], [50, 10, 5]], 'activation': ['logistic'], 'solver': ['adam', 'sgd'], 'alpha': [5e-05], 'learning_rate_init': [0.0001], 'max_iter': [100000], 'batch_size': [200, 20]}
njobs=3
def ST(X_train, X_test, y_train, y_test,param_list,njobs):
    import os
    import time
    import pandas as pd
    import numpy as np
    import sklearn.neural_network as snn
    import joblib
    import dask_ml.model_selection as dcv
    from dask.distributed import Client
    from dask_cuda import LocalCUDACluster
    cluster = LocalCUDACluster()
    client =  Client(cluster)
    start_time = time.time()
    tracemalloc.start()
    estimator=snn.MLPRegressor
    estimator.client = client
    regressor = dcv.GridSearchCV(estimator(verbose=True),param_list,cv=5,n_jobs=njobs)
    regressor.fit(X_train, y_train)
    print(regressor)
    current, peak = tracemalloc.get_traced_memory()
    elapsed_time = time.time() - start_time
    tracemalloc.stop()
    joblib.dump(regressor,'Model-Result.pkl')
    g=open(os.getcwd()+'/Model-Performance.txt','w')
    g.write("Mean cross-validated score of the best_estimator: {:.2f}".format(regressor.best_score_)+'\n')
    g.write("Test dataset R2 score: {:.2f}".format(regressor.score(X_test, y_test))+'\n')
    g.write("Test dataset MSError: {:.2f}".format(np.mean((regressor.predict(X_test) - y_test) ** 2))+'\n')
    g.write("Test dataset Case Number: {:.2f}".format(len( y_test))+'\n')
    g.write("Train dataset Case Number: {:.2f}".format(len( y_train))+'\n')
    g.write('Model Fitting Computational Time: '+str(float(elapsed_time)/3600)+'hours\n')
    g.write("Model Fitting of memory usage is {"+str(int(current)/(10**6))+"}MB; Peak was {"+str(int(peak)/(10**6))+"}MB"+'\n')
    g.close()
    df = pd.DataFrame(regressor.cv_results_,columns= ['params','rank_test_score','std_fit_time','mean_score_time','std_score_time','split0_test_score','split1_test_score','split2_test_score','mean_test_score','std_test_score','param_activation','param_alpha','param_hidden_layer_sizes','param_solver'])
    df.to_csv(os.getcwd()+'/Model-Comparison.csv', index = False, header=True)
    return
if __name__ ==  '__main__':
    ST(X_train, X_test, y_train, y_test,param_list,njobs)
#=====#

Thank you so much @MQMQ2018 for the edited example! @jacobtomlinson do you have any guidance on the recommended way for checking that a GPU is working as expected with RAPIDS?

If you expect sklearn MLPRegressor to use GPU for training, I fear this is not possible as stated at the top of this page.

Warning: This implementation is not intended for large-scale applications. In particular, scikit-learn offers no GPU support. For much faster, GPU-based implementations, as well as frameworks offering much more flexibility to build deep learning architectures, see Related Projects.

So first of all, you should begin by using Pytorch or TensorFlow.

After that, I’m not sure if LocalCudaCluster is needed for training Deep Learning models using GPU with Dask, or if having GPU enabled Python library of Pytorch or Keras/Tensorflow is enough.

3 Likes

Dear @guillaumeeb, @scharlottej13
Thank you so much for your help and reply.
Is there any GPU supported function in Pytorch or TensorFlow playing the similar role as “sklearn.neural_network.MLPRegressor” did?
Best,
Qi

Thank you for your information and help.
Is there any GPU-supported Functions in Pytorch or Tensorflow playing similar role as “sklearn.neural_network.MLPRegressor” do? Thank you so much.

Best,
Qi

@guillaumeeb

Thank you for your information and help.
Is there any GPU-supported Functions in Pytorch or Tensorflow playing similar role as “sklearn.neural_network.MLPRegressor” do? Thank you so much.

Best,
Qi

Yup. So Qi the library you are looking for is skorch (GitHub - skorch-dev/skorch: A scikit-learn compatible neural network library that wraps PyTorch) , which does exactly what you need and you can use LocalCudaCluster for parallelizing it across multiple GPUs with dask . See below example.

Please note that you use distributed's cluster too by setting up the cluster with CUDA_VISIBLE_DEVICES as described here but LocalCudaCluster is often cleaner, robust and easier to setup when using GPUs

import numpy as np
from sklearn.datasets import make_classification
from torch import nn

from skorch import NeuralNetClassifier
from sklearn.model_selection import GridSearchCV
from dask.distributed import Client
from dask_cuda import LocalCUDACluster


cluster = LocalCUDACluster()
client = Client(cluster)

X, y = make_classification(10000, 20, n_informative=10, random_state=0)
X = X.astype(np.float32)
y = y.astype(np.int64)

class MyModule(nn.Module):
    def __init__(self, num_units=10, nonlin=nn.ReLU()):
        super(MyModule, self).__init__()

        self.dense0 = nn.Linear(20, num_units)
        self.nonlin = nonlin
        self.dropout = nn.Dropout(0.5)
        self.dense1 = nn.Linear(num_units, num_units)
        self.output = nn.Linear(num_units, 2)
        self.softmax = nn.Softmax(dim=-1)

    def forward(self, X, **kwargs):
        X = self.nonlin(self.dense0(X))
        X = self.dropout(X)
        X = self.nonlin(self.dense1(X))
        X = self.softmax(self.output(X))
        return X


net = NeuralNetClassifier(
    MyModule,
    max_epochs=10,
    lr=0.1,
    # Shuffle training data on each epoch
    iterator_train__shuffle=True,
    device='cuda'
)

# deactivate skorch-internal train-valid split and verbose logging
net.set_params(train_split=False, verbose=0)
params = {
    'lr': [0.01, 0.02],
    'max_epochs': [10, 20],
    'module__num_units': [10, 20],
}

# ## with joblib
from joblib import parallel_backend
gs = GridSearchCV(
    net,
    params,
    scoring='accuracy',
    )
with parallel_backend('dask'):
    gs.fit(X, y)

Also note that dask-ml's GridSearchCV is not working currently but using joblib with dask-cuda cluster like above should still work. Exploring what needs to be done to enable dask-ml.

1 Like

Dera @VibhuJawa,

Merry Christmas!!

Thank you so much for your guidance. It is very helpful.
Since my original work is a regression task using snn.MLPRegressor. May I use the way from the function in:
skorch.regressor — skorch 0.11.0 documentation”?
And Do you think the way below is the right way to go? Thank you so much.

#==#

import numpy as np
from torch import nn

from skorch import NeuralNetRegressor
from sklearn.model_selection import GridSearchCV
from dask.distributed import Client
from dask_cuda import LocalCUDACluster

cluster = LocalCUDACluster()
client = Client(cluster)

import os
import pandas as pd
import dask.array as da
X=pd.DataFrame(np.random.rand(1671,50),columns=list(['F'+str(i) for i in range(0,50)]),index=list(['R'+str(i) for i in range(0,1671)]))
y=np.random.rand(1671,)

class MyModule(nn.Module):
    def __init__(self, num_units=10, nonlin=nn.ReLU()):
        super(MyModule, self).__init__()

        self.dense0 = nn.Linear(20, num_units)
        self.nonlin = nonlin
        self.dropout = nn.Dropout(0.5)
        self.dense1 = nn.Linear(num_units, num_units)
        self.output = nn.Linear(num_units, 2)
        self.softmax = nn.Softmax(dim=-1)

    def forward(self, X, **kwargs):
        X = self.nonlin(self.dense0(X))
        X = self.dropout(X)
        X = self.nonlin(self.dense1(X))
        X = self.softmax(self.output(X))
        return X

from skorch import NeuralNetRegressor
net = NeuralNetClassifier(
    MyModule,
    max_epochs=10,
    lr=0.1,
    # Shuffle training data on each epoch
    iterator_train__shuffle=True,
    device='cuda'
)

# deactivate skorch-internal train-valid split and verbose logging
net.set_params(train_split=False, verbose=0)
params = {
    'lr': [0.01, 0.02],
    'max_epochs': [10, 20],
    'module__num_units': [10, 20],
    'hidden_layer_sizes': [[500], [100], [50], [10], [5], [500, 100], [500, 50], [500, 10], [500, 5], [100, 50], [100, 10], [100, 5], [50, 10], [50, 5], [10, 5], [500, 100, 50], [500, 100, 10], [500, 100, 5], [500, 50, 10], [500, 50, 5], [500, 10, 5], [100, 50, 10], [100, 50, 5], [100, 10, 5], [50, 10, 5]], 
    'activation': ['logistic'], 
    'solver': ['adam', 'sgd'], 
    'alpha': [5e-05], 
    'max_iter': [100000], 
    'batch_size': [200, 20]
}

## with joblib

from joblib import parallel_backend
gs = GridSearchCV(
    net,
    params,
    scoring='accuracy',
    )
with parallel_backend('dask'):
    gs.fit(X, y)

#==#

I think there is a minor mistake here, you should use NeuralNetRegressor instead of NeuralNetClassifier .

from skorch import NeuralNetRegressor
net = NeuralNetRegressor(

Apart from that it looks good to me.

1 Like

Got you. Thank you so much, @VibhuJawa .

Best,
Qi