Serialization problem using GridSearchCV fit

Hello, I am a beginner with GPU accelerated computing and I can’t find what is wrong with my code. I am getting this serialization error and don’t understand why.

Here is my code:

import numpy as np
import cudf
from dask.distributed import Client
from sklearn.metrics import classification_report
import pandas as pd
from dask_cuda import LocalCUDACluster
from cuml.dask.ensemble import RandomForestClassifier as cuRF
import dask_cudf
from cuml.dask.common.utils import persist_across_workers
import pickle
import cloudpickle
import dask_ml.model_selection as dcv

def generate_synthetic_data(n_samples=10000):
    np.random.seed(42)
    Y = np.random.randn(n_samples)
    A = np.random.randint(0, 5, size=n_samples)
    B = np.random.randint(0, 5, size=n_samples)
    C = np.random.randint(1, 3, size=n_samples)
    DATE = pd.date_range(start='1/1/2022', periods=n_samples, freq='min')

    data = {
        'A': A,
        'B': B,
        'C': C,
        'DATE': DATE,
        'Y': Y,
    }
    return pd.DataFrame(data)

def main():
    # Initialize Dask client for GPU with LocalCUDACluster
    cluster = LocalCUDACluster()
    client = Client(cluster)

    # Load and preprocess data
    df_data = generate_synthetic_data()

    # Data preprocessing
    df_data['DATE'] = pd.to_datetime(df_data['DATE'], errors='coerce')
    df_data.fillna(0, inplace=True)
    df_data['C'] = df_data['C'].astype(float)
    df_data.drop_duplicates(inplace=True)
    df_data = df_data.loc[df_data['C'] == 1]

    df_data['Y_category'] = df_data['Y'].apply(lambda x: 'over 0' if x > 0 else ('under 0' if x < 0 else 'equal to 0'))
    df_encoded = df_data.drop(columns=['Y'])

    label_mapping = {'over 0': 2, 'under 0': 1, 'equal to 0': 0}
    df_encoded['Y_category'] = df_encoded['Y_category'].map(label_mapping)
    df_encoded.sort_values(by='DATE', inplace=True)

    df_encoded = cudf.DataFrame.from_pandas(df_encoded)

    # Split data into features and target
    X = df_encoded.drop(columns=['DATE', 'Y_category']).astype('float32')
    y = df_encoded['Y_category'].astype('int32')
    split_point = int(len(df_encoded) * 0.8)
    X_train, X_test = X.iloc[:split_point], X.iloc[split_point:]
    y_train, y_test = y.iloc[:split_point], y.iloc[split_point:]

    # Balance the classes using undersampling
    y_train_counts = y_train.value_counts().to_pandas()
    min_samples = y_train_counts.min()

    sampled_indices = []
    for label in y_train_counts.index:
        indices = y_train[y_train == label].index.to_pandas().to_series()
        sampled = indices.sample(n=min_samples, random_state=42).tolist()
        sampled_indices.extend(sampled)

    sampled_indices = np.array(sampled_indices)

    # Ensure indices are unique and within bounds
    sampled_indices = np.unique(sampled_indices)
    sampled_indices = sampled_indices[sampled_indices < len(X_train)]

    X_train_balanced = X_train.iloc[sampled_indices]
    y_train_balanced = y_train.iloc[sampled_indices]

    # Convert to Dask DataFrame directly
    X_train_dask = dask_cudf.from_cudf(X_train_balanced, npartitions=10).persist(optimize_graph=True)
    y_train_dask = dask_cudf.from_cudf(y_train_balanced, npartitions=10).persist(optimize_graph=True)

    X_train_dask, y_train_dask = persist_across_workers(client,
                                                      [X_train_dask,
                                                       y_train_dask])

    #Define the parameter grid
    param_grid = {
        'max_depth': [10, 20, 30],
        'max_features': [0.1, 0.5, 0.75, "auto"],
        'n_estimators': [10, 20, 30]
    }

    # # Manually perform grid search
    # best_score = -np.inf
    # best_params = None
    # for n_estimators in param_grid['n_estimators']:
    #     for max_depth in param_grid['max_depth']:
    #         for max_features in param_grid['max_features']:
    #             rf = cuRF(n_estimators=n_estimators, max_depth=max_depth, max_features=max_features, random_state=42, n_streams=1)
    #             rf.fit(X_train_dask, y_train_dask,broadcast_data=True)
    #             y_pred = rf.predict(X_train_dask)
    #             score = classification_report(y_train_dask.compute().to_numpy(), y_pred.compute().to_numpy(), output_dict=True)['weighted avg']['f1-score']
    #             if score > best_score:
    #                 best_score = score
    #                 best_params = {
    #                     'n_estimators': n_estimators,
    #                     'max_depth': max_depth,
    #                     'max_features': max_features
    #                 }
    #
    # print(best_score)
    # print(best_params)

    # Initialize and fit the model using GridSearchCV
    model_rf = cuRF(random_state=42)
    grid_search = dcv.GridSearchCV(model_rf, param_grid, cv=5, scoring='f1_weighted')
    grid_search.fit(X_train_dask, y_train_dask)  # Fit with Dask arrays

    # Train the model with the best parameters
    best_rf = cuRF(**grid_search.best_params_, random_state=42)
    best_rf.fit(X_train_dask, y_train_dask)

    # Predict on the test set
    X_test_dask = dask_cudf.from_cudf(X_test, npartitions=1).to_dask_array(lengths=True)
    y_pred_best = best_rf.predict(X_test_dask)

    # Evaluate the model
    report_best = classification_report(y_test.to_pandas(), y_pred_best.compute().get())
    print(report_best)

if __name__ == "__main__":
    main()

And here is my error

/home/username/miniconda3/envs/ML/bin/python /mnt/c/Users/username/PythonProjects/Snowflake/Stats/Clean RF - Forums.py 
/home/username/miniconda3/envs/ML/lib/python3.10/site-packages/distributed/node.py:182: UserWarning: Port 8787 is already in use.
Perhaps you already have a cluster running?
Hosting the HTTP server on port 35291 instead
  warnings.warn(
/home/username/miniconda3/envs/ML/lib/python3.10/site-packages/dask_cuda/utils.py:170: UserWarning: Cannot get CPU affinity for device with index 0, setting default affinity
  warnings.warn(
      A  B    C                DATE         Y Y_category
4     0  4  1.0 2022-01-01 00:04:00 -0.234153    under 0
6     3  3  1.0 2022-01-01 00:06:00  1.579213     over 0
9     2  2  1.0 2022-01-01 00:09:00  0.542560     over 0
13    4  1  1.0 2022-01-01 00:13:00 -1.913280    under 0
14    2  3  1.0 2022-01-01 00:14:00 -1.724918    under 0
...  .. ..  ...                 ...       ...        ...
9992  4  3  1.0 2022-01-07 22:32:00  0.662169     over 0
9996  2  3  1.0 2022-01-07 22:36:00 -1.998345    under 0
9997  4  3  1.0 2022-01-07 22:37:00 -0.705317    under 0
9998  3  3  1.0 2022-01-07 22:38:00  0.495766     over 0
9999  1  2  1.0 2022-01-07 22:39:00  0.644388     over 0

[4904 rows x 6 columns]
      A  B    C                DATE Y_category
4     0  4  1.0 2022-01-01 00:04:00    under 0
6     3  3  1.0 2022-01-01 00:06:00     over 0
9     2  2  1.0 2022-01-01 00:09:00     over 0
13    4  1  1.0 2022-01-01 00:13:00    under 0
14    2  3  1.0 2022-01-01 00:14:00    under 0
...  .. ..  ...                 ...        ...
9992  4  3  1.0 2022-01-07 22:32:00     over 0
9996  2  3  1.0 2022-01-07 22:36:00    under 0
9997  4  3  1.0 2022-01-07 22:37:00    under 0
9998  3  3  1.0 2022-01-07 22:38:00     over 0
9999  1  2  1.0 2022-01-07 22:39:00     over 0

[4904 rows x 5 columns]
/home/username/miniconda3/envs/ML/lib/python3.10/site-packages/dask_expr/_collection.py:301: UserWarning: Dask annotations {'workers': ['tcp://127.0.0.1:35551']} detected. Annotations will be ignored when using query-planning.
  warnings.warn(
/home/username/miniconda3/envs/ML/lib/python3.10/site-packages/cuml/internals/api_decorators.py:344: UserWarning: For reproducible results in Random Forest Classifier or for almost reproducible results in Random Forest Regressor, n_streams=1 is recommended. If n_streams is > 1, results may vary due to stream/thread timing differences, even when random_state is set
  return func(**kwargs)
2024-08-09 22:50:06,603 - distributed.protocol.pickle - ERROR - Failed to serialize <ToPickle: HighLevelGraph with 1 layers.
<dask.highlevelgraph.HighLevelGraph object at 0x7f97d4f16e90>
 0. 140290088916224
>.
Traceback (most recent call last):
  File "/home/username/miniconda3/envs/ML/lib/python3.10/site-packages/distributed/protocol/pickle.py", line 63, in dumps
    result = pickle.dumps(x, **dump_kwargs)
_pickle.PicklingError: Can't pickle <function _concat at 0x7f98a0bca050>: it's not the same object as dask.dataframe.core._concat

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/home/username/miniconda3/envs/ML/lib/python3.10/site-packages/distributed/protocol/pickle.py", line 68, in dumps
    pickler.dump(x)
_pickle.PicklingError: Can't pickle <function _concat at 0x7f98a0bca050>: it's not the same object as dask.dataframe.core._concat

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/home/username/miniconda3/envs/ML/lib/python3.10/site-packages/distributed/protocol/pickle.py", line 81, in dumps
    result = cloudpickle.dumps(x, **dump_kwargs)
  File "/home/username/miniconda3/envs/ML/lib/python3.10/site-packages/cloudpickle/cloudpickle.py", line 1479, in dumps
    cp.dump(obj)
  File "/home/username/miniconda3/envs/ML/lib/python3.10/site-packages/cloudpickle/cloudpickle.py", line 1245, in dump
    return super().dump(obj)
  File "/home/username/miniconda3/envs/ML/lib/python3.10/site-packages/cuml/dask/common/base.py", line 60, in __getstate__
    internal_model = self._get_internal_model().result()
AttributeError: 'NoneType' object has no attribute 'result'
Traceback (most recent call last):
  File "/home/username/miniconda3/envs/ML/lib/python3.10/site-packages/distributed/protocol/pickle.py", line 63, in dumps
    result = pickle.dumps(x, **dump_kwargs)
_pickle.PicklingError: Can't pickle <function _concat at 0x7f98a0bca050>: it's not the same object as dask.dataframe.core._concat

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/home/username/miniconda3/envs/ML/lib/python3.10/site-packages/distributed/protocol/pickle.py", line 68, in dumps
    pickler.dump(x)
_pickle.PicklingError: Can't pickle <function _concat at 0x7f98a0bca050>: it's not the same object as dask.dataframe.core._concat

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/home/username/miniconda3/envs/ML/lib/python3.10/site-packages/distributed/protocol/serialize.py", line 366, in serialize
    header, frames = dumps(x, context=context) if wants_context else dumps(x)
  File "/home/username/miniconda3/envs/ML/lib/python3.10/site-packages/distributed/protocol/serialize.py", line 78, in pickle_dumps
    frames[0] = pickle.dumps(
  File "/home/username/miniconda3/envs/ML/lib/python3.10/site-packages/distributed/protocol/pickle.py", line 81, in dumps
    result = cloudpickle.dumps(x, **dump_kwargs)
  File "/home/username/miniconda3/envs/ML/lib/python3.10/site-packages/cloudpickle/cloudpickle.py", line 1479, in dumps
    cp.dump(obj)
  File "/home/username/miniconda3/envs/ML/lib/python3.10/site-packages/cloudpickle/cloudpickle.py", line 1245, in dump
    return super().dump(obj)
  File "/home/username/miniconda3/envs/ML/lib/python3.10/site-packages/cuml/dask/common/base.py", line 60, in __getstate__
    internal_model = self._get_internal_model().result()
AttributeError: 'NoneType' object has no attribute 'result'

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "/mnt/c/Users/username/PythonProjects/Snowflake/Stats/Clean RF - Forums.py", line 137, in <module>
    main()
  File "/mnt/c/Users/username/PythonProjects/Snowflake/Stats/Clean RF - Forums.py", line 122, in main
    grid_search.fit(X_train_dask, y_train_dask)  # Fit with Dask arrays
  File "/home/username/miniconda3/envs/ML/lib/python3.10/site-packages/dask_ml/model_selection/_search.py", line 1266, in fit
    futures = scheduler(
  File "/home/username/miniconda3/envs/ML/lib/python3.10/site-packages/distributed/client.py", line 3456, in get
    futures = self._graph_to_futures(
  File "/home/username/miniconda3/envs/ML/lib/python3.10/site-packages/distributed/client.py", line 3351, in _graph_to_futures
    header, frames = serialize(ToPickle(dsk), on_error="raise")
  File "/home/username/miniconda3/envs/ML/lib/python3.10/site-packages/distributed/protocol/serialize.py", line 392, in serialize
    raise TypeError(msg, str_x) from exc
TypeError: ('Could not serialize object of type HighLevelGraph', '<ToPickle: HighLevelGraph with 1 layers.\n<dask.highlevelgraph.HighLevelGraph object at 0x7f97d4f16e90>\n 0. 140290088916224\n>')
2024-08-09 22:50:06,611 - distributed.scheduler - WARNING - Removing worker 'tcp://127.0.0.1:35551' caused the cluster to lose already computed task(s), which will be recomputed elsewhere: {('frompandas-306fd2e3802a3a7b81c6a76a3a727041', 9), ('frompandas-774706f179cb38b2def7a1244ddd29fc', 7), '_func_get_params-8435bd472e94a239c8f55cc96e333e20', ('frompandas-a91f82e90b4590ee2b57246953d5e528', 8), ('frompandas-306fd2e3802a3a7b81c6a76a3a727041', 8), ('frompandas-8ff65de4a6fcee5b2828666823bb1b34', 9), '_construct_rf-360ad856-f8a8-496e-92d3-0d1ce9f2e6be'} (stimulus_id='handle-worker-cleanup-1723258206.6108973')

Process finished with exit code 1

I am using a RTX A4000. Python 3.10.9. NVIDIA-SMI 550.106. Driver Version: 552.86. CUDA Version: 12.4. cuDF version: 24.08.00a405. cuML version: 24.08.00a50. Dask version: 2024.7.1

Can someone please help me with this!

Hi @LuckyLuke, welcome to Dask Discourse forum!

I edited your code to make it a bit more readable.

Did you try to simplify the problem? Does your code works when not using Cuda packages? so only on CPU? Are you able to train a RF model, not doing a GridSearch?

Nothing jumps out of my mind.

Hello Guillaume! Thanks for your reply. I did try to simplify the problem by looping through each of the element of my Param Grid and doing a RF for each set and it worked. The problem is really when I do this:

grid_search = dcv.GridSearchCV(model_rf, param_grid, cv=5, scoring=‘f1_weighted’)
grid_search.fit(X_train_dask, y_train_dask) # Fit with Dask arrays

It seems that the .fit does not manage well the format (or something else) of the GridSearchCV object.

Also, I have tried to run the code only on CPU and it seems to work. I have some warnings which are:

/home/user/miniconda3/envs/ML/bin/python /mnt/c/Users/user/PythonProjects/Snowflake/Stats/Clean RF CPUs - Forum.py 
2024-08-15 14:48:13,463 - distributed.scheduler - WARNING - Detected different `run_spec` for key '_indexable-dc0a49527e63820404d1999a66ef5c11' between two consecutive calls to `update_graph`. This can cause failures and deadlocks down the line. Please ensure unique key names. If you are using a standard dask collections, consider releasing all the data before resubmitting another computation. More details and help can be found at https://github.com/dask/dask/issues/9888. 
Debugging information
---------------------
old task state: released
old run_spec: (<function _indexable at 0x7f3f13a712d0>, ('finalize-741384a2-ce86-4e7c-964e-712df5cc0695',), {})
new run_spec: (<function _indexable at 0x7f3f13a712d0>, ('finalize-2b82efa5-ebda-443e-a09b-1e206476d30f',), {})
old token: ('tuple', [('08a73b82b61f2ab8b68d8b7234df74eac8237585', []), ('tuple', ['finalize-741384a2-ce86-4e7c-964e-712df5cc0695']), ('dict', [])])
new token: ('tuple', [('08a73b82b61f2ab8b68d8b7234df74eac8237585', []), ('tuple', ['finalize-2b82efa5-ebda-443e-a09b-1e206476d30f']), ('dict', [])])
old dependencies: {'finalize-741384a2-ce86-4e7c-964e-712df5cc0695'}
new dependencies: {'finalize-2b82efa5-ebda-443e-a09b-1e206476d30f'}

              precision    recall  f1-score   support

           1       0.51      0.49      0.50       491
           2       0.51      0.52      0.51       490

    accuracy                           0.51       981
   macro avg       0.51      0.51      0.51       981
weighted avg       0.51      0.51      0.51       981

2024-08-15 14:48:13,547 - distributed.scheduler - WARNING - Removing worker 'tcp://127.0.0.1:35333' caused the cluster to lose already computed task(s), which will be recomputed elsewhere: {('frompandas-ffea317c1a274dadeb00f49edc6215d2', 6), ('frompandas-ffea317c1a274dadeb00f49edc6215d2', 2)} (stimulus_id='handle-worker-cleanup-1723747693.5472105')
2024-08-15 14:48:13,547 - distributed.scheduler - WARNING - Removing worker 'tcp://127.0.0.1:45091' caused the cluster to lose already computed task(s), which will be recomputed elsewhere: {('frompandas-ffea317c1a274dadeb00f49edc6215d2', 8)} (stimulus_id='handle-worker-cleanup-1723747693.5476315')
2024-08-15 14:48:13,548 - distributed.scheduler - WARNING - Removing worker 'tcp://127.0.0.1:35137' caused the cluster to lose already computed task(s), which will be recomputed elsewhere: {('frompandas-ffea317c1a274dadeb00f49edc6215d2', 3), ('frompandas-ffea317c1a274dadeb00f49edc6215d2', 7)} (stimulus_id='handle-worker-cleanup-1723747693.5483158')
2024-08-15 14:48:13,549 - distributed.scheduler - WARNING - Removing worker 'tcp://127.0.0.1:36969' caused the cluster to lose already computed task(s), which will be recomputed elsewhere: {('frompandas-ffea317c1a274dadeb00f49edc6215d2', 4), ('frompandas-ffea317c1a274dadeb00f49edc6215d2', 0)} (stimulus_id='handle-worker-cleanup-1723747693.54905')
2024-08-15 14:48:13,549 - distributed.scheduler - WARNING - Removing worker 'tcp://127.0.0.1:44545' caused the cluster to lose already computed task(s), which will be recomputed elsewhere: {('frompandas-ffea317c1a274dadeb00f49edc6215d2', 9)} (stimulus_id='handle-worker-cleanup-1723747693.549728')
2024-08-15 14:48:13,550 - distributed.scheduler - WARNING - Removing worker 'tcp://127.0.0.1:35933' caused the cluster to lose already computed task(s), which will be recomputed elsewhere: {('frompandas-ffea317c1a274dadeb00f49edc6215d2', 5), ('frompandas-ffea317c1a274dadeb00f49edc6215d2', 1)} (stimulus_id='handle-worker-cleanup-1723747693.5500755')

Process finished with exit code 0

So what do you think is causing the problem?

Thanks a lot for your help.

OK, so this definitly seems related to some incompatibility between dask-cuda and dask-ml. ccing @jacobtomlinson, but I think you should open an issue with a reproducer on dask-cuda github issue tracker.

1 Like

Yeah I agree that opening an issue on dask-cuda with a minimal reproducer would be the best way forewards.

Thanks for your reply guys, I will do that. Were you able to run the code without error on your side?

Thanks.