Hello, I am a beginner with GPU accelerated computing and I can’t find what is wrong with my code. I am getting this serialization error and don’t understand why.
Here is my code:
import numpy as np
import cudf
from dask.distributed import Client
from sklearn.metrics import classification_report
import pandas as pd
from dask_cuda import LocalCUDACluster
from cuml.dask.ensemble import RandomForestClassifier as cuRF
import dask_cudf
from cuml.dask.common.utils import persist_across_workers
import pickle
import cloudpickle
import dask_ml.model_selection as dcv
def generate_synthetic_data(n_samples=10000):
np.random.seed(42)
Y = np.random.randn(n_samples)
A = np.random.randint(0, 5, size=n_samples)
B = np.random.randint(0, 5, size=n_samples)
C = np.random.randint(1, 3, size=n_samples)
DATE = pd.date_range(start='1/1/2022', periods=n_samples, freq='min')
data = {
'A': A,
'B': B,
'C': C,
'DATE': DATE,
'Y': Y,
}
return pd.DataFrame(data)
def main():
# Initialize Dask client for GPU with LocalCUDACluster
cluster = LocalCUDACluster()
client = Client(cluster)
# Load and preprocess data
df_data = generate_synthetic_data()
# Data preprocessing
df_data['DATE'] = pd.to_datetime(df_data['DATE'], errors='coerce')
df_data.fillna(0, inplace=True)
df_data['C'] = df_data['C'].astype(float)
df_data.drop_duplicates(inplace=True)
df_data = df_data.loc[df_data['C'] == 1]
df_data['Y_category'] = df_data['Y'].apply(lambda x: 'over 0' if x > 0 else ('under 0' if x < 0 else 'equal to 0'))
df_encoded = df_data.drop(columns=['Y'])
label_mapping = {'over 0': 2, 'under 0': 1, 'equal to 0': 0}
df_encoded['Y_category'] = df_encoded['Y_category'].map(label_mapping)
df_encoded.sort_values(by='DATE', inplace=True)
df_encoded = cudf.DataFrame.from_pandas(df_encoded)
# Split data into features and target
X = df_encoded.drop(columns=['DATE', 'Y_category']).astype('float32')
y = df_encoded['Y_category'].astype('int32')
split_point = int(len(df_encoded) * 0.8)
X_train, X_test = X.iloc[:split_point], X.iloc[split_point:]
y_train, y_test = y.iloc[:split_point], y.iloc[split_point:]
# Balance the classes using undersampling
y_train_counts = y_train.value_counts().to_pandas()
min_samples = y_train_counts.min()
sampled_indices = []
for label in y_train_counts.index:
indices = y_train[y_train == label].index.to_pandas().to_series()
sampled = indices.sample(n=min_samples, random_state=42).tolist()
sampled_indices.extend(sampled)
sampled_indices = np.array(sampled_indices)
# Ensure indices are unique and within bounds
sampled_indices = np.unique(sampled_indices)
sampled_indices = sampled_indices[sampled_indices < len(X_train)]
X_train_balanced = X_train.iloc[sampled_indices]
y_train_balanced = y_train.iloc[sampled_indices]
# Convert to Dask DataFrame directly
X_train_dask = dask_cudf.from_cudf(X_train_balanced, npartitions=10).persist(optimize_graph=True)
y_train_dask = dask_cudf.from_cudf(y_train_balanced, npartitions=10).persist(optimize_graph=True)
X_train_dask, y_train_dask = persist_across_workers(client,
[X_train_dask,
y_train_dask])
#Define the parameter grid
param_grid = {
'max_depth': [10, 20, 30],
'max_features': [0.1, 0.5, 0.75, "auto"],
'n_estimators': [10, 20, 30]
}
# # Manually perform grid search
# best_score = -np.inf
# best_params = None
# for n_estimators in param_grid['n_estimators']:
# for max_depth in param_grid['max_depth']:
# for max_features in param_grid['max_features']:
# rf = cuRF(n_estimators=n_estimators, max_depth=max_depth, max_features=max_features, random_state=42, n_streams=1)
# rf.fit(X_train_dask, y_train_dask,broadcast_data=True)
# y_pred = rf.predict(X_train_dask)
# score = classification_report(y_train_dask.compute().to_numpy(), y_pred.compute().to_numpy(), output_dict=True)['weighted avg']['f1-score']
# if score > best_score:
# best_score = score
# best_params = {
# 'n_estimators': n_estimators,
# 'max_depth': max_depth,
# 'max_features': max_features
# }
#
# print(best_score)
# print(best_params)
# Initialize and fit the model using GridSearchCV
model_rf = cuRF(random_state=42)
grid_search = dcv.GridSearchCV(model_rf, param_grid, cv=5, scoring='f1_weighted')
grid_search.fit(X_train_dask, y_train_dask) # Fit with Dask arrays
# Train the model with the best parameters
best_rf = cuRF(**grid_search.best_params_, random_state=42)
best_rf.fit(X_train_dask, y_train_dask)
# Predict on the test set
X_test_dask = dask_cudf.from_cudf(X_test, npartitions=1).to_dask_array(lengths=True)
y_pred_best = best_rf.predict(X_test_dask)
# Evaluate the model
report_best = classification_report(y_test.to_pandas(), y_pred_best.compute().get())
print(report_best)
if __name__ == "__main__":
main()
And here is my error
/home/username/miniconda3/envs/ML/bin/python /mnt/c/Users/username/PythonProjects/Snowflake/Stats/Clean RF - Forums.py
/home/username/miniconda3/envs/ML/lib/python3.10/site-packages/distributed/node.py:182: UserWarning: Port 8787 is already in use.
Perhaps you already have a cluster running?
Hosting the HTTP server on port 35291 instead
warnings.warn(
/home/username/miniconda3/envs/ML/lib/python3.10/site-packages/dask_cuda/utils.py:170: UserWarning: Cannot get CPU affinity for device with index 0, setting default affinity
warnings.warn(
A B C DATE Y Y_category
4 0 4 1.0 2022-01-01 00:04:00 -0.234153 under 0
6 3 3 1.0 2022-01-01 00:06:00 1.579213 over 0
9 2 2 1.0 2022-01-01 00:09:00 0.542560 over 0
13 4 1 1.0 2022-01-01 00:13:00 -1.913280 under 0
14 2 3 1.0 2022-01-01 00:14:00 -1.724918 under 0
... .. .. ... ... ... ...
9992 4 3 1.0 2022-01-07 22:32:00 0.662169 over 0
9996 2 3 1.0 2022-01-07 22:36:00 -1.998345 under 0
9997 4 3 1.0 2022-01-07 22:37:00 -0.705317 under 0
9998 3 3 1.0 2022-01-07 22:38:00 0.495766 over 0
9999 1 2 1.0 2022-01-07 22:39:00 0.644388 over 0
[4904 rows x 6 columns]
A B C DATE Y_category
4 0 4 1.0 2022-01-01 00:04:00 under 0
6 3 3 1.0 2022-01-01 00:06:00 over 0
9 2 2 1.0 2022-01-01 00:09:00 over 0
13 4 1 1.0 2022-01-01 00:13:00 under 0
14 2 3 1.0 2022-01-01 00:14:00 under 0
... .. .. ... ... ...
9992 4 3 1.0 2022-01-07 22:32:00 over 0
9996 2 3 1.0 2022-01-07 22:36:00 under 0
9997 4 3 1.0 2022-01-07 22:37:00 under 0
9998 3 3 1.0 2022-01-07 22:38:00 over 0
9999 1 2 1.0 2022-01-07 22:39:00 over 0
[4904 rows x 5 columns]
/home/username/miniconda3/envs/ML/lib/python3.10/site-packages/dask_expr/_collection.py:301: UserWarning: Dask annotations {'workers': ['tcp://127.0.0.1:35551']} detected. Annotations will be ignored when using query-planning.
warnings.warn(
/home/username/miniconda3/envs/ML/lib/python3.10/site-packages/cuml/internals/api_decorators.py:344: UserWarning: For reproducible results in Random Forest Classifier or for almost reproducible results in Random Forest Regressor, n_streams=1 is recommended. If n_streams is > 1, results may vary due to stream/thread timing differences, even when random_state is set
return func(**kwargs)
2024-08-09 22:50:06,603 - distributed.protocol.pickle - ERROR - Failed to serialize <ToPickle: HighLevelGraph with 1 layers.
<dask.highlevelgraph.HighLevelGraph object at 0x7f97d4f16e90>
0. 140290088916224
>.
Traceback (most recent call last):
File "/home/username/miniconda3/envs/ML/lib/python3.10/site-packages/distributed/protocol/pickle.py", line 63, in dumps
result = pickle.dumps(x, **dump_kwargs)
_pickle.PicklingError: Can't pickle <function _concat at 0x7f98a0bca050>: it's not the same object as dask.dataframe.core._concat
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/home/username/miniconda3/envs/ML/lib/python3.10/site-packages/distributed/protocol/pickle.py", line 68, in dumps
pickler.dump(x)
_pickle.PicklingError: Can't pickle <function _concat at 0x7f98a0bca050>: it's not the same object as dask.dataframe.core._concat
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/home/username/miniconda3/envs/ML/lib/python3.10/site-packages/distributed/protocol/pickle.py", line 81, in dumps
result = cloudpickle.dumps(x, **dump_kwargs)
File "/home/username/miniconda3/envs/ML/lib/python3.10/site-packages/cloudpickle/cloudpickle.py", line 1479, in dumps
cp.dump(obj)
File "/home/username/miniconda3/envs/ML/lib/python3.10/site-packages/cloudpickle/cloudpickle.py", line 1245, in dump
return super().dump(obj)
File "/home/username/miniconda3/envs/ML/lib/python3.10/site-packages/cuml/dask/common/base.py", line 60, in __getstate__
internal_model = self._get_internal_model().result()
AttributeError: 'NoneType' object has no attribute 'result'
Traceback (most recent call last):
File "/home/username/miniconda3/envs/ML/lib/python3.10/site-packages/distributed/protocol/pickle.py", line 63, in dumps
result = pickle.dumps(x, **dump_kwargs)
_pickle.PicklingError: Can't pickle <function _concat at 0x7f98a0bca050>: it's not the same object as dask.dataframe.core._concat
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/home/username/miniconda3/envs/ML/lib/python3.10/site-packages/distributed/protocol/pickle.py", line 68, in dumps
pickler.dump(x)
_pickle.PicklingError: Can't pickle <function _concat at 0x7f98a0bca050>: it's not the same object as dask.dataframe.core._concat
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/home/username/miniconda3/envs/ML/lib/python3.10/site-packages/distributed/protocol/serialize.py", line 366, in serialize
header, frames = dumps(x, context=context) if wants_context else dumps(x)
File "/home/username/miniconda3/envs/ML/lib/python3.10/site-packages/distributed/protocol/serialize.py", line 78, in pickle_dumps
frames[0] = pickle.dumps(
File "/home/username/miniconda3/envs/ML/lib/python3.10/site-packages/distributed/protocol/pickle.py", line 81, in dumps
result = cloudpickle.dumps(x, **dump_kwargs)
File "/home/username/miniconda3/envs/ML/lib/python3.10/site-packages/cloudpickle/cloudpickle.py", line 1479, in dumps
cp.dump(obj)
File "/home/username/miniconda3/envs/ML/lib/python3.10/site-packages/cloudpickle/cloudpickle.py", line 1245, in dump
return super().dump(obj)
File "/home/username/miniconda3/envs/ML/lib/python3.10/site-packages/cuml/dask/common/base.py", line 60, in __getstate__
internal_model = self._get_internal_model().result()
AttributeError: 'NoneType' object has no attribute 'result'
The above exception was the direct cause of the following exception:
Traceback (most recent call last):
File "/mnt/c/Users/username/PythonProjects/Snowflake/Stats/Clean RF - Forums.py", line 137, in <module>
main()
File "/mnt/c/Users/username/PythonProjects/Snowflake/Stats/Clean RF - Forums.py", line 122, in main
grid_search.fit(X_train_dask, y_train_dask) # Fit with Dask arrays
File "/home/username/miniconda3/envs/ML/lib/python3.10/site-packages/dask_ml/model_selection/_search.py", line 1266, in fit
futures = scheduler(
File "/home/username/miniconda3/envs/ML/lib/python3.10/site-packages/distributed/client.py", line 3456, in get
futures = self._graph_to_futures(
File "/home/username/miniconda3/envs/ML/lib/python3.10/site-packages/distributed/client.py", line 3351, in _graph_to_futures
header, frames = serialize(ToPickle(dsk), on_error="raise")
File "/home/username/miniconda3/envs/ML/lib/python3.10/site-packages/distributed/protocol/serialize.py", line 392, in serialize
raise TypeError(msg, str_x) from exc
TypeError: ('Could not serialize object of type HighLevelGraph', '<ToPickle: HighLevelGraph with 1 layers.\n<dask.highlevelgraph.HighLevelGraph object at 0x7f97d4f16e90>\n 0. 140290088916224\n>')
2024-08-09 22:50:06,611 - distributed.scheduler - WARNING - Removing worker 'tcp://127.0.0.1:35551' caused the cluster to lose already computed task(s), which will be recomputed elsewhere: {('frompandas-306fd2e3802a3a7b81c6a76a3a727041', 9), ('frompandas-774706f179cb38b2def7a1244ddd29fc', 7), '_func_get_params-8435bd472e94a239c8f55cc96e333e20', ('frompandas-a91f82e90b4590ee2b57246953d5e528', 8), ('frompandas-306fd2e3802a3a7b81c6a76a3a727041', 8), ('frompandas-8ff65de4a6fcee5b2828666823bb1b34', 9), '_construct_rf-360ad856-f8a8-496e-92d3-0d1ce9f2e6be'} (stimulus_id='handle-worker-cleanup-1723258206.6108973')
Process finished with exit code 1
I am using a RTX A4000. Python 3.10.9. NVIDIA-SMI 550.106. Driver Version: 552.86. CUDA Version: 12.4. cuDF version: 24.08.00a405. cuML version: 24.08.00a50. Dask version: 2024.7.1
Can someone please help me with this!