I’m attempting to run millions of embarrassingly parallel arima forecasting models and using dask distributed to help. (Using a custom selection process than AutoArima.)
Dask, of course, is simple to get this going, but, oddly, what seems to be occurring is an inflection point of efficiency when the number of single threaded workers equal half of the CPU cores. For example, on a 14 core machine, four workers is only 2x faster than the sequential single process, and eight workers is only 3x faster than the sequential single process.
The arima modeling package is compiled with numba, and I’m thinking there is some hidden parallelism in itself that could explain the marginal slowness. The computer’s resource monitor doesn’t seem to indicate being at CPU capacity when the inflection occurs.
Below is a reproducible example that should display that doubling the num_workers
does not double the performance. I would appreciate any tips to address the bottleneck or any explanation of what is occurring.
import itertools
import os
from time import time
import warnings
import dask
from dask.distributed import Client
# import pandas as pd
warnings.filterwarnings("ignore")
from statsforecast.utils import generate_series
from statsforecast.models import ARIMA
os.environ['NIXTLA_NUMBA_CACHE'] = '1'
num_workers = 4
threads_per_worker = 1
num_data_sets = 10
def model_arima(y_df, all_arima_params):
# Static model settings
season = 4
horizon_validate = 5
horizon_forecast = 20
y_trimmed = y_df.iloc[:-5]['y'].values
all_y = y_df['y'].values
collect_fit_models = []
for order, seasonal in all_arima_params:
arima = ARIMA(order=order,
seasonal_order=seasonal,
season_length=season,
include_constant=True
)
fit = arima.fit(y=y_trimmed)
validate = arima.forecast(y=y_trimmed, h=horizon_validate, level=[80], fitted=True)
forecast = arima.forecast(y=all_y, h=horizon_forecast, level=[80], fitted=False)
collect_fit_models.append((fit.model_, validate, forecast))
return collect_fit_models
if __name__ == '__main__':
# Start dask workers
client = Client(n_workers=num_workers, threads_per_worker=threads_per_worker)
print(client.dashboard_link)
# Generate data
data_sets = generate_series(n_series=num_data_sets, seed=1, freq='QS', min_length=50, max_length=170)
# models
# Define parameter ranges
ar_p = [0, 1, 2] # , 3, 4]
i_d = [0, 1, 2]
ma_q = [0, 1, 2, 3, 4]
sar_p = [0] # ,1]
si_d = [0] # ,1]
sma_q = [0] # ,1]
all_arima_params = [((p, d, q), (sp, sd, sq)) for (p, d, q, sp, sd, sq) in
itertools.product(ar_p, i_d, ma_q, sar_p, si_d, sma_q)]
num_models = num_data_sets * len(all_arima_params)
print(f"Models to calc {num_models}")
# Create list of delayed tasks for workers
compute_list = []
for _, data_set in data_sets.groupby('unique_id'):
compute_list.append(dask.delayed(model_arima)(data_set, all_arima_params))
# Use dask to compute models
print("Starting arima modeling")
st = time()
out = dask.compute(compute_list)
duration = time() - st
print(f"Modeling duration seconds {duration}")
# More would occur from here....