Thank you very much for your response. I am new to Dask and parallel computing so my approach maybe wrong.
My goal is to speed this for loop:
brier_scores_list = [train_and_score(i) for i in range(X_train_cudf.shape[1])]
Notes:
I am using a single gpu
This is my first approach and the error message I got with this approach:
dask.config.set(scheduler='processes', serialization='pickle')
cluster = LocalCUDACluster(CUDA_VISIBLE_DEVICES="0")
client = Client(cluster)
# Generate sample data
X, y = make_classification(n_samples=500, n_features=10000, n_classes=2, random_state=42)
# Split the data into train and test sets
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)
# Convert to Dask cuDF DataFrame/Series
X_train_dask_cudf = dask_cudf.from_cudf(X_train_cudf, npartitions=20).persist(optimize_graph=True)
X_test_dask_cudf = dask_cudf.from_cudf(X_test_cudf, npartitions=20).persist(optimize_graph=True)
y_train_dask_cudf = dask_cudf.from_cudf(y_train_cudf, npartitions=20).persist(optimize_graph=True)
y_test_dask_cudf = dask_cudf.from_cudf(y_test_cudf, npartitions=20).persist(optimize_graph=True)
def train_and_score(index, X_train, X_test, y_train, y_test):
X_train_single_feature = X_train.iloc[:, index:index+1]
X_test_single_feature = X_test.iloc[:, index:index+1]
# Initialize and train the model
model = LogisticRegression(client=client)
model.fit(X_train_single_feature, y_train)
# Predict probabilities and calculate Brier score
y_pred_test = model.predict_proba(X_test_single_feature)[:, 1]
brier_score = ((y_test - y_pred_test) ** 2).mean()
return brier_score
def compute_brier_scores(feature_index):
return train_and_score(feature_index, X_train_dask_cudf, X_test_dask_cudf, y_train_dask_cudf, y_test_dask_cudf)
# Using Dask's 'delayed' to parallelize
delayed_results = [dask.delayed(compute_brier_scores)(i) for i in range(X_train.shape[1])]
start_time = time.time()
# Trigger computation
brier_scores = dask.compute(*delayed_results)
end_time = time.time()
ERROR MESSAGE:
TypeError: cannot pickle ‘_asyncio.Task’ object. TypeError: (‘Could not serialize object of type HighLevelGraph’, '<ToPickle: HighLevelGraph with 10000 layers.
I had another approach, where I was creating batches of task to decrease the layers and these were the error messages I got:
TypeError: cannot pickle ‘_asyncio.Task’ object
TypeError: (‘Could not serialize object of type HighLevelGraph’, '<ToPickle: HighLevelGraph with 12 layers.