'Could not serialize object of type HighLevelGraph'

Hi everyone,

I’m encountering an error in my Python code and could use some assistance. I’m getting the following TypeError: ‘Could not serialize object of type HighLevelGraph’, '<ToPickle: HighLevelGraph with 10000 layers.\n<dask.highlevelgraph.HighLevelGraph object.

Any insights or suggestions on how to address this issue would be greatly appreciated.

Hi @avocadola, welcome to Dask community!

Could you provide some reproducer showing how you get this error?

Maybe it’s because your graph is too big, a HighLevelGraph with 10000 layers seems huge.

Thank you very much for your response. I am new to Dask and parallel computing so my approach maybe wrong.

My goal is to speed this for loop:
brier_scores_list = [train_and_score(i) for i in range(X_train_cudf.shape[1])]

Notes:
I am using a single gpu

This is my first approach and the error message I got with this approach:

dask.config.set(scheduler='processes', serialization='pickle')

cluster = LocalCUDACluster(CUDA_VISIBLE_DEVICES="0")
client = Client(cluster)

# Generate sample data

X, y = make_classification(n_samples=500, n_features=10000, n_classes=2, random_state=42)

# Split the data into train and test sets
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)

# Convert to Dask cuDF DataFrame/Series
X_train_dask_cudf = dask_cudf.from_cudf(X_train_cudf, npartitions=20).persist(optimize_graph=True)
X_test_dask_cudf = dask_cudf.from_cudf(X_test_cudf, npartitions=20).persist(optimize_graph=True)
y_train_dask_cudf = dask_cudf.from_cudf(y_train_cudf, npartitions=20).persist(optimize_graph=True)
y_test_dask_cudf = dask_cudf.from_cudf(y_test_cudf, npartitions=20).persist(optimize_graph=True)

def train_and_score(index, X_train, X_test, y_train, y_test):
    X_train_single_feature = X_train.iloc[:, index:index+1]  
    X_test_single_feature = X_test.iloc[:, index:index+1]

    # Initialize and train the model
    model = LogisticRegression(client=client)
    model.fit(X_train_single_feature, y_train)

    # Predict probabilities and calculate Brier score
    y_pred_test = model.predict_proba(X_test_single_feature)[:, 1] 
    brier_score = ((y_test - y_pred_test) ** 2).mean()  

    return brier_score

def compute_brier_scores(feature_index):
    return train_and_score(feature_index, X_train_dask_cudf, X_test_dask_cudf, y_train_dask_cudf, y_test_dask_cudf)


# Using Dask's 'delayed' to parallelize
delayed_results = [dask.delayed(compute_brier_scores)(i) for i in range(X_train.shape[1])]

start_time = time.time()

# Trigger computation
brier_scores = dask.compute(*delayed_results)

end_time = time.time()

ERROR MESSAGE:
TypeError: cannot pickle ‘_asyncio.Task’ object. TypeError: (‘Could not serialize object of type HighLevelGraph’, '<ToPickle: HighLevelGraph with 10000 layers.

I had another approach, where I was creating batches of task to decrease the layers and these were the error messages I got:

TypeError: cannot pickle ‘_asyncio.Task’ object
TypeError: (‘Could not serialize object of type HighLevelGraph’, '<ToPickle: HighLevelGraph with 12 layers.

It seems there are missing parts in your code: the import to see what packages you are using to begin, but also at some point a X_train_cudf variable is referenced, but not created.

dask.config.set(scheduler='processes', serialization='pickle')

cluster = LocalCUDACluster(CUDA_VISIBLE_DEVICES="0")

If you are using a LocalCudaCluster, there is no point in setting the scheduler config to processes. Also, why do you force serialization to pickle?

Do you also have a more complete stack trace?