Dask - Model training

I wanna train my one regression and one classification model.
For regression just simple Linear Regression and for classification RandomForestClassifier.

But all the time i get memory error

2024-06-04 15:29:31,055 - distributed.nanny.memory - WARNING - Worker tcp://127.0.0.1:54482 (pid=17536) exceeded 95% memory budget. Restarting…

2024-06-04 15:29:31,221 - distributed.scheduler - WARNING - Removing worker ‘tcp://127.0.0.1:54482’ caused the cluster to lose already computed task(s), which will be recomputed elsewhere:

2024-06-04 15:29:32,556 - distributed.nanny - WARNING - Restarting worker

I am running this on local machine

  • Intel Core i7-1065G7 1.3-3.9GHz
  • 16.0 GB RAM

My actual code is

# Initialize Dask client
client = Client(n_workers=4, threads_per_worker=1)
client

# Read all csv files with defined data types
dfs = [dd.read_csv(f'{year}.csv', dtype=dtypes, blocksize='100MB') for year in [2016, 2017, 2018]]

# Concatenate the individual DataFrames
airline_delays = dd.concat(dfs, interleave_partitions=True)

# # Set the number of partitions
# airline_delays = airline_delays.repartition(partition_size='100MB')

# Drop unnecessary columns
airline_delays = airline_delays.drop(['FL_DATE', 'Unnamed: 27'], axis=1)

# Persist the DataFrame to manage memory usage better
airline_delays = airline_delays.persist()

# Convert string[pyarrow] and object columns to categorical
categorical_cols = airline_delays.select_dtypes(include=['string[pyarrow]', 'object']).columns
for col in categorical_cols:
    airline_delays[col] = airline_delays[col].astype('category')
    
# Encode categorical columns
le = LabelEncoder()
for col in categorical_cols:
    # Use map_partitions to apply LabelEncoder on each partition
    airline_delays[col] = airline_delays[col].map_partitions(lambda s: le.fit_transform(s.astype(str)))

# Verify that all columns are now numerical
print(airline_delays.dtypes)

# Split the data into features and target
X = airline_delays.drop('ARR_DELAY', axis=1)
y = airline_delays['ARR_DELAY']

# Split the data into training and testing sets
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42, shuffle=True)

# Train the Linear Regression model
linear_regression = LinearRegression()
linear_regression.fit(X_train, y_train)

# Predict and evaluate
y_pred = linear_regression.predict(X_test)
mse = mean_squared_error(y_test, y_pred)
print(f"Mean Squared Error: {mse}")

# Optionally, shut down the Dask client after processing
# client.close()

Process of training

Hi @DataScientist, welcome to Dask community!

First, just to be sure, are you using dask-ml (for LinearRegression class for example)?

How large is your input dataset, do you have enough memory to persist it?

In your dashboard screeshot, we can see a repartition operation under process, I’m not sure what is triggering it in your code?

Hi @guillaumeeb !

Actually no, but before tried import from dask-ml
That is all my imports

import pandas as pd
import numpy as np
import joblib
import dask
import dask.dataframe as dd
import dask.array as da

import matplotlib.pyplot as plt
import seaborn as sns
# import hvplot.dask
import altair as alt

from sklearn.ensemble import RandomForestClassifier
from sklearn.svm import SVC
from sklearn.linear_model import LinearRegression
from sklearn.metrics import classification_report, accuracy_score
from dask_ml.model_selection import train_test_split
from dask_ml.preprocessing import DummyEncoder, LabelEncoder
from dask_ml.compose import ColumnTransformer
from xgboost import XGBClassifier, DMatrix  # Import DaskXGBClassifier from xgboost.dask
from dask_ml.metrics import accuracy_score, mean_squared_error
from dask.distributed import Client, progress

My input dataset is around ≈2-2.5 GB
After reading csv file I have 21 partitions each one is 100MB

# Read all csv files with defined data types
dfs = [dd.read_csv(f'{year}.csv', dtype=dtypes, blocksize='100MB') for year in [2016, 2017, 2018]]

Yeah, i also see that everything works fine make a couple calculations and after is repartition but i actually just try to train the model not making any repartitions

I think that it a part of dask logic or train algorithm )))

But what triggers that I don’t know

I’m not sure what you mean here, do you use dask-ml or not?

I don’t see anything in your code that would trigger this repartition. You should try to execute the code step by step to better understand.

No. I don’t use dask-ml.

As i see in dask-ml is LinearRegression model but is not RandomForestClassifier for classification model.

What i should use instead for classification task?

Well, if you don’t use dask-ml, and use scikit-learn instead, all your data will be loaded into memory at one point, which can cause failure.

There is no implementation of RandomForest for icnremental or partial learning as far as I know. You could try with XGBoost which has some Dask extension and is in the same family.

Thanks a lot for your solution! :partying_face: