Hello everybody and already thanks for your help in advance.
Abstract:
I try to performing an OLS-Regression with dask-ml LinearRegression. Despite debugging the code with a small data sample the modelling does not come to an end. In the dashboard I can see that the task “local_update” takes by far the most time - especially relative to the other tasks. Unfortunately I do not fully understand, what is represented by this task, why it takes that long and how i can speed it up. I assume an error in my understand and therefore in my way of using the model and/or writing the code (that is posted below). I am not new to python but not an expert either and i am happy for every hint and/or background information / comments you have regarding my issue.
Hardware / Software:
python installed via Anaconda (Anaconda Navigator v. 2.6.3)
working in jupyter Notebook (v. 7.2.2)
CPU: i5 9500 ( 6 physical and 6 logical cores at 4.1 GHz)
RAM: 64GB (4x16GB at 2666MHz)
SSD ( 500 GB reserved as RAM swap file): about 3k writing/reading speed (M2 PCIe 3.0)
Description with background information:
I have a database that grows daily and I want to perform an OLS-regression regularly .
So far I have used statsmodel without any problems. With the time going by and my database growing it slowed down. To use the OLS regression in statsmodel my swap file has to have a certain size (currently around 350GB if I use the whole data I want to use for the modelling).
Recently I learned about the option to use partitions of data and speed things up with multiprocessing and multithreading. Thats when I learned about dask and dask-ml, which should (if i use it right) be much faster - if my understanding is correct (please correct me otherwise).
But all my attempts so far are ended in a manual restart of the program because the dask-ml LinearRegression does not finish.
So, let`s take a look at the code.
Assume a pandas dataframe df_adj. Normally that would (currently) have about 400.000 rows and 22.000 columns. For the setup of my code I use a smaller sample and df_adj consists of about 35.000 rows and 2.300 columns.
Performing an OLS regression with statsmodel works without any problem with the following code within a minute (just in case it is relevant: r² of 74%). So the data should be fine.
Based on the same pandas dataframe df_adj I want to perform the regression with dask. As far as I know I have to start a client and transform the pandas df into a dask array (correct so far?).
The partitions, processes, and threads will be calculated and used as variables in the final code, but in my example they are represented by these numers.
According to a quick check, every array chunk in X_train should be about 203 MB large.
And the line “model_OLS.fit(X_train, y_train)” is the one that never finishes. For the same dataset statsmodel modeled in about 1 minute, the LinearRegression I use here was not finished after 4 hours when I killed the process (I performed this comparison multiple times).
There is no error.
By using "model_OLS = LinearRegression(fit_intercept=True, max_iter=10, tol=1e-2) " the model will come to an end after about 5 minutes but of course the results are far from a good fit.
So I checked the Dashboard and saw, that every worker seems to work, most of the tasks are pretty fast and there was absolutely no memory problem (about 6GB of my physical RAM was used). Outstanding for me are the local_update tasks that seem to slow the process down.
(Of course I testes a larger sample of my total database as well but with no relevant differences to this smaller scenario)
So - i thought using dask-ml with multiple processes and (in the final case) multithreading should be faster than statsmodel. Is my usecase wrong? Did I forget an important step in my code that is necessary for the correct implementation? Did I made another silly mistake I just don`t see by myself?
I am thankfull for any comment that helps me to understand if my code is wrong or my understanding is wrong.
First thing I do would be to check the performance of Scikit learn LinearRegression, which is what Dask is based on. Maybe statsmodel is muche better?
Second thing would be to build a full reproducer that is not tied to your data, so we can better help in this forum.
Not necessarily faster. Currently your solution uses a high speed SSD as virtualized memory. If you dataset fits in this RAM, then standard methods like statsmodel or scikit learn can be faster. Parallelization comes with a cost, especially in Machine Learning when fiting a single model in parallel. That should not be that much slower though.
Starting a Client is not mandatory, but certainly important to use multiprocessing and to access the Dashboard. Transforming the Pandas df into a Dask Array is not a good way of saying it either. You need a Dask Array in the end, but ideally, you whould not go through Pandas, but directly read Dask DataFrame or Arrays from disk.
Also, if your data fits in RAM like in this example, you should persist the Dask Array into memory before running the LinearRegression.
Beware, it would certainly be much better to use hardware compatible values with processes and threads, and partitions should be of a size that maximise your code speed, not related to initial dataset size. You don’t need 3 partitions because you have 3 Workers, maybe you could try with 12 for example!
I’m not sure what it corresponds to, and why you say it is slowing things down. A MVCE would be ideal to see if there really is some performance problem.
I took some time to check some questions and try out some ideas before commenting again so I can rule out some careless mistakes from my side but I did not make any progress.
Creating the dask dataframe is no time issue in the code. But I was not sure if there is any delayed time consuming task so I saved the dataframe as parquet-file and read it directly into a dask dataframe. In short: there was no difference in the performance, the time or the optical illustration of the tasks in the dashboard - so I excluded this step again (at least for the moment).
I tried several combinations of workers, threads and partitions but did not experience any difference in the outcome. The reason why I mentioned the dynamic computing via code is based on the idea to use the code on different machines in the future and base the number of workers, threads and partitions on the individual number of cores and RAM. One version of that will be displayed in the example code I post below.
The reason for 3 partitions and workers in the example was that (in my understanding, please correct me otherwise) I thought there is no need in more workers than partitions and the partitions should be between 100MB and 2GB big with beeing optimal between 200 and 500 MB. If you know more about that please let me know.
I thought a lot about this comment but I do not understand what you mean exactly. Could you please specify what it means to persist the Dask Array into memory? This was one of the reasons I experimented with the das-array creation (e.g. with the parquet-file) but I could not create a scenario that looks different to me.
My main understanding was to compute the array before using it for the regression. But the it becomes a numpy array and the regression code will result in an error message - so I guess that was a misunderstanding from my side.
So I created an example Code that uses a generated dataframe and performes OLS Regressions with statsmodel, sklearn, sklearn with multithreading and dask. (100000x10000, Adjustable and also testes with different sizes)
(I understand that the main advantage of dask is to work with data that does not fit in the RAM, but for testing purposes I need a benchmark and a comparison while still considering that there might be a shift in larger datasets.)
I use a seed to create the data to ensure I repeat the code and get matching results.
In this case I achieved matching R² -Values with the statsmodel, the sklearn and the sklearn multithreading regressions of about 9.87%, taking 329, 148 and 147 seconds.
The Dask Regression (19 workers with 2 threads each and 38 partitions) ran 6803 seconds (close to 2 hours) and achieved an R² of about 6.7%.
I can not understand what leads to this long computing time and the lower R² (other statistical values behave comparable to the described R² difference).
Performed in jupiter notebook.
Code
# Import necessary libraries
import os
import sys
import time
import psutil
# Define a function to check, install, and import packages dynamically
def install_and_import(package_name, import_name=None):
try:
if import_name is None:
import_name = package_name
__import__(import_name)
except ImportError:
print(f"Installing {package_name}...")
os.system(f"{sys.executable} -m pip install {package_name}")
globals()[import_name] = __import__(import_name)
# Check and install all required packages
required_packages = {
"numpy": "numpy",
"pandas": "pandas",
"statsmodels": "statsmodels",
"scikit-learn": "sklearn",
"dask[complete]": "dask",
"dask-ml": "dask_ml",
"multiprocessing": "multiprocessing", # Add multiprocessing package
}
for package, module_name in required_packages.items():
install_and_import(package, module_name)
# Import libraries after ensuring they are installed
import numpy as np
import pandas as pd
import statsmodels.api as sm
from sklearn.linear_model import LinearRegression
from sklearn.metrics import r2_score, mean_squared_error
import dask.dataframe as dd
from dask_ml.linear_model import LinearRegression as DaskLinearRegression
from dask.distributed import Client, LocalCluster
import multiprocessing # Import multiprocessing
import warnings
# Suppress specific Dask warnings
warnings.filterwarnings("ignore", message="Sending large graph of size")
# Generate reproducible dataset
def generate_dataset(seed=42):
np.random.seed(seed)
# Generate random data
data = np.random.rand(100000, 10000).round(6)
# Set non-zero values in selected columns
selected_columns = list(range(200, 4000))
for col in selected_columns:
idx = np.random.choice(100000, 100, replace=False)
data[idx, col] = 0
# Create DataFrame
return pd.DataFrame(data, columns=[f"col_{i}" for i in range(10000)])
def validate_dataset_generation():
print("Validating dataset generation consistency...")
df1 = generate_dataset()
df2 = generate_dataset()
if df1.equals(df2):
print("Dataset generation is consistent.")
else:
print("Dataset generation is inconsistent. Please check the implementation.")
del df1
del df2
def run_statsmodels_ols(df):
print("\nRunning Statsmodels OLS Regression...")
start_time = time.time()
X = df.iloc[:, 1:]
y = df.iloc[:, 0]
X = sm.add_constant(X)
model = sm.OLS(y, X).fit()
duration = time.time() - start_time
residuals = model.resid
print("\n[Statsmodels OLS Results]")
print(f"R²: {model.rsquared:.6f}")
print(f"Residual Std Dev: {np.std(residuals):.6f}")
print(f"Residual Mean: {np.mean(residuals):.6f}")
print(f"Duration: {duration:.2f} seconds")
return duration
def run_sklearn_ols(df):
print("\nRunning Scikit-learn OLS Regression...")
start_time = time.time()
X = df.iloc[:, 1:]
y = df.iloc[:, 0]
model = LinearRegression()
model.fit(X, y)
y_pred = model.predict(X)
residuals = y - y_pred
duration = time.time() - start_time
print("\n[Scikit-learn OLS Results]")
print(f"R²: {r2_score(y, y_pred):.6f}")
print(f"Residual Std Dev: {np.std(residuals):.6f}")
print(f"Residual Mean: {np.mean(residuals):.6f}")
print(f"Duration: {duration:.2f} seconds")
return duration
def run_dask_ols(df):
print("\nRunning Dask-ML OLS Regression...")
# Maximum memory size per partition
max_partition_size_mb = 200
# Calculate the total number of elements in the DataFrame
num_elements = df.size # Number of elements in the DataFrame (rows * columns)
# Size of a float64 value (8 bytes)
bytes_per_element = 8
# Calculate total size in bytes
total_bytes = num_elements * bytes_per_element
# Convert total size to MB
total_size_mb = total_bytes / (1024**2)
print(f"Total size of the DataFrame as float64: {total_size_mb:.2f} MB")
# Number of logical cores (including Hyper-Threading)
num_cores = os.cpu_count()
print(f"Number of logical cores: {num_cores}")
num_cores = round(num_cores / 2, 0)
# Number of partitions based on memory requirements
npartitions_based_on_memory = int(total_size_mb / max_partition_size_mb)
# Final number of partitions: at least as many as cores, but also based on memory size
npartitions_zahl = max(npartitions_based_on_memory, num_cores)
npartitions_zahl=int(npartitions_zahl)
print(f"Recommended number of partitions: {npartitions_zahl}")
# Number of physical cores on Windows
num_physical_cores = psutil.cpu_count(logical=False) # Returns only physical cores
print(f"Number of physical cores: {num_physical_cores}")
num_physical_cores = psutil.cpu_count(logical=False) - 1
threads_per_worker_zahl = 2
if npartitions_zahl <= num_physical_cores:
threads_per_worker_zahl = 1
n_workers_zahl_thread_help = int(np.floor(npartitions_zahl / threads_per_worker_zahl))
n_workers_zahl = min(num_physical_cores, n_workers_zahl_thread_help)
# Calculation of memory allocation per worker (500 GB / number of physical cores)
#memory_usage = round(500 / n_workers_zahl, 0) # Example: 500 GB divided by number of cores
memory_usage = 16
memory_limit = f"{memory_usage}GB" # Memory usage as a string, e.g. "100GB"
# Start the Dask client with multi-core support
client = Client(processes=True, n_workers=n_workers_zahl, threads_per_worker=threads_per_worker_zahl, memory_limit=memory_limit)
# Output for verification
print(f"Memory usage per worker: {memory_limit}")
# Display scheduler information
print(client)
#print(npartitions_zahl)
npartitions_zahl = int(npartitions_zahl)
#print(npartitions_zahl)
print(f"Dask Dashboard: {client.dashboard_link}")
duration = None # Initialize duration
try:
# Convert to Dask DataFrame
ddf = dd.from_pandas(df, npartitions=npartitions_zahl)
print("Test_1")
X = ddf.iloc[:, 1:].to_dask_array(lengths=True)
y = ddf.iloc[:, 0].to_dask_array(lengths=True)
#X=X.compute()
#y=y.compute()
print("Test_2")
# Train Dask-ML Linear Regression
start_time = time.time()
model = DaskLinearRegression()
model.fit(X, y)
# Create a DataFrame to store the coefficients
# Get the coefficients
coefficients = model.coef_
coefficient_df = pd.DataFrame(coefficients, columns=["Coefficient"])
coefficient_df.index = df.columns[1:] # Set feature names as the index
# Save the coefficients to an Excel file
#coefficient_df.to_excel("coefficients.xlsx")
# Calculate predictions and residuals
y_pred = model.predict(X).compute()
residuals = (y.compute() - y_pred)
duration = time.time() - start_time
# Output the results
print("\n[Dask-ML OLS Results]")
print(f"R²: {model.score(X, y):.6f}")
print(f"Residual Std Dev: {np.std(residuals):.6f}")
print(f"Residual Mean: {np.mean(residuals):.6f}")
print(f"Duration: {duration:.2f} seconds")
except UnicodeDecodeError as e:
print(f"UnicodeDecodeError encountered: {e}")
print("Ensure all data is UTF-8 encoded.")
except Exception as e:
print(f"Error during Dask-ML OLS: {e}")
finally:
client.close()
return duration if duration is not None else float("inf")
print(f"Script started at: {time.strftime('%Y-%m-%d %H:%M:%S')}")
# Validate dataset generation
validate_dataset_generation()
# Generate dataset
print("\nGenerating dataset...")
df = generate_dataset()
print("Dataset generated.")
print(df)
print(df.describe())
print(df.iloc[:, 300].describe())
# Run OLS regressions
statsmodels_duration = run_statsmodels_ols(df)
print(f"Statsmodels OLS Duration: {statsmodels_duration:.2f} seconds\n")
sklearn_duration = run_sklearn_ols(df)
print(f"Scikit-learn OLS Duration: {sklearn_duration:.2f} seconds\n")
def run_sklearn_ols_with_n_jobs(df, num_workers):
# Start the timer
start_time = time.time()
# X is the matrix of features (all columns except the target column)
X = df.iloc[:, 1:].values
# y is the target column (first column)
y = df.iloc[:, 0].values
# Create the model with n_jobs=number of workers
model = LinearRegression(n_jobs=num_workers)
# Fit the model (training the data)
model.fit(X, y)
# Predictions
y_pred = model.predict(X)
# Calculate R²
r2 = r2_score(y, y_pred)
residuals = y - y_pred
residual_std_dev = np.std(residuals)
residual_mean = np.mean(residuals)
# Calculate the duration
duration = time.time() - start_time
# Output the results
print("\n[OLS with n_jobs and LinearRegression]")
print(f"R²: {r2:.6f}")
print(f"Residual Standard Deviation: {residual_std_dev:.6f}")
print(f"Residual Mean: {residual_mean:.6f}")
print(f"Duration: {duration:.2f} seconds")
return duration
num_workers = 20 # Number of workers (parallel threads)
sklearn_multithreading_duration = run_sklearn_ols_with_n_jobs(df, num_workers=num_workers)
# Customized output of the duration
print(f"Multithreading Scikit-learn OLS Duration with {num_workers} workers (using n_jobs): {sklearn_multithreading_duration :.2f} seconds\n")
dask_duration = run_dask_ols(df)
print(f"Dask-ML OLS Duration: {dask_duration:.2f} seconds\n")
In case there is any confusion about the number of workers: I upgraded my system in the meantime to an i7-14700KF. So the workers are calculated as shown in the code based on my systems ressources.
The adjustments to the generated dataset should assist to simulate the high number of dummy-variables existing in the acutal data I wanna work with. (75% might not be an exaggeration)
Second thing: the way you create the data as Pandas and then transform it to a Dask Dataframe means you send a really big object which gets serialized from Scheduler to workers on every iteration. Normaly, data should be directly read by the workers. Adding something like:
ddf = ddf.persist()
Should make things faster, but it will still take time. Distributed incremental learning takes more time than a mathematical formula. You can try to reduce the number of iterations.