I wanna train my one regression and one classification model.
For regression just simple Linear Regression and for classification RandomForestClassifier.
But all the time i get memory error
2024-06-04 15:29:31,055 - distributed.nanny.memory - WARNING - Worker tcp://127.0.0.1:54482 (pid=17536) exceeded 95% memory budget. Restarting…
2024-06-04 15:29:31,221 - distributed.scheduler - WARNING - Removing worker ‘tcp://127.0.0.1:54482’ caused the cluster to lose already computed task(s), which will be recomputed elsewhere:
2024-06-04 15:29:32,556 - distributed.nanny - WARNING - Restarting worker
I am running this on local machine
- Intel Core i7-1065G7 1.3-3.9GHz
- 16.0 GB RAM
My actual code is
# Initialize Dask client
client = Client(n_workers=4, threads_per_worker=1)
client
# Read all csv files with defined data types
dfs = [dd.read_csv(f'{year}.csv', dtype=dtypes, blocksize='100MB') for year in [2016, 2017, 2018]]
# Concatenate the individual DataFrames
airline_delays = dd.concat(dfs, interleave_partitions=True)
# # Set the number of partitions
# airline_delays = airline_delays.repartition(partition_size='100MB')
# Drop unnecessary columns
airline_delays = airline_delays.drop(['FL_DATE', 'Unnamed: 27'], axis=1)
# Persist the DataFrame to manage memory usage better
airline_delays = airline_delays.persist()
# Convert string[pyarrow] and object columns to categorical
categorical_cols = airline_delays.select_dtypes(include=['string[pyarrow]', 'object']).columns
for col in categorical_cols:
airline_delays[col] = airline_delays[col].astype('category')
# Encode categorical columns
le = LabelEncoder()
for col in categorical_cols:
# Use map_partitions to apply LabelEncoder on each partition
airline_delays[col] = airline_delays[col].map_partitions(lambda s: le.fit_transform(s.astype(str)))
# Verify that all columns are now numerical
print(airline_delays.dtypes)
# Split the data into features and target
X = airline_delays.drop('ARR_DELAY', axis=1)
y = airline_delays['ARR_DELAY']
# Split the data into training and testing sets
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42, shuffle=True)
# Train the Linear Regression model
linear_regression = LinearRegression()
linear_regression.fit(X_train, y_train)
# Predict and evaluate
y_pred = linear_regression.predict(X_test)
mse = mean_squared_error(y_test, y_pred)
print(f"Mean Squared Error: {mse}")
# Optionally, shut down the Dask client after processing
# client.close()
Process of training