DaskLGBMClassifier and Hypertuning using RandomizedSearchCV with DASK ECS Fargate Cluster

Hi community, we are trying to build the model for 2 million plus data with 2K features. We are trying to build the model using DaskLGBMClassifier, then hypertune the parameters with RandomizedSearchCV. With few dataset, like 600 rows, it’s working fine, but when I increase the dataset to 67000, then it started throwing futures related problems. Here is the sample code, that we have been using. Any suggestions will be highly appreciated.

import boto3
import s3fs
import time
import dask
import dask.dataframe as dd
import lightgbm as lgb
import dask_ml.model_selection as dcv
# Get list of parquet files in the folder
s3 = boto3.resource('s3')
file_list = []
for obj in s3.bucket.objects.filter(Prefix="parquet"):
    if obj.key.endswith('.parquet'):
# Load the data into a Dask dataframe
fs = s3fs.S3FileSystem(anon=False)
ddf = dd.read_parquet(file_list, storage_options={"anon": False, "use_ssl": True}, engine="pyarrow")
# Split the data into training and testing sets
columns_to_drop = ["feature1", "feature2", "feature3"]
ddf = ddf.drop(columns=columns_to_drop)
# Calculate the approximate number of rows that will give us 50MB
chunk_size = 50 * 1024 * 1024 
num_rows = len(ddf)
memory_usage = ddf.memory_usage(deep=True).sum().compute()
rows_per_chunk = int((chunk_size * num_rows) // memory_usage)
# Repartition the dataframe
num_partitions = num_rows // (rows_per_chunk // 2) + 1
ddf = ddf.repartition(npartitions=num_partitions)
X, y = ddf.drop(columns=["bad"]), ddf["bad"]
X_train, X_test, y_train, y_test = dcv.train_test_split(X, y, test_size=0.2, shuffle=True)
# Convert the Dask DataFrames to Dask Arrays and compute the chunk sizes
X_train = X_train.to_dask_array(lengths=True).persist()
X_test = X_test.to_dask_array(lengths=True).persist()
y_train = y_train.to_dask_array(lengths=True).persist()
y_test = y_test.to_dask_array(lengths=True).persist()
# Using LightGBM Classifier
clf = lgb.DaskLGBMClassifier(client=dclient)
clf = clf.fit(X_train, y_train)
# Using randomized search to find the best hyperparameters
# Randomized search not mandatory here, need some kind of hyperparameters tuning
param_space = {
    'learning_rate': [0.05, 0.1, 0.15],
    'max_depth': [3, 5, 7],
    'num_leaves': [15, 31, 63],
search = dcv.RandomizedSearchCV(clf, param_space, cv=5)
search.fit(X_train, y_train)
# Get the best hyperparameters and train the model
best_params = search.best_params_
clf = clf.fit(X_train, y_train)

Hi @apokhrel647, welcome to Dask community!

So to investigate what might be the problem, I’d try several things:

  • I don’t see where your dclient object is initialized, however, I assume this a Dask Client pointing to a Distributed cluster (LocalCluster?). you should try to follow the computation into the Dashboard to have some insights.
  • If you run the code up to the multiple persist() calls, does it work? Are you able to persist all your inputs into the distributed cluster memory?
  • You are doing some repartitioning, I wonder if this is really necessary. Can’t you just read the Dataframe with the blocksize kwarg of read_parquet?
  • Does it work with the single model training, clf.fit call?

RandomizedSearchCV implies a lot of computations, the idea is to see if there are other limitations before firing this up.

Hi @apokhrel647 here is some material that might be helpful to take ideas from for your use case. In the notebooks in this repository, we do Hyperparameter Optimization using Optuna and training and XGBoost model with custom cross-validation. It’s not your exact problem, but these notebooks will walk you through how to scale a similar problem to the one you are having. For your case, I’d recommend you look into the Modeling notebooks 1, 2, and 3 on this repo. But especially on number 3 which shows how to perform HPO with Optuna in multicluster and distributed XGBoost in a dataset of a 1.4 billon rows.

Extra: For the last notebook has clear outputs, but if you want a full example with outputs you can read it here

1 Like