Additional workers marginally slower

I’m attempting to run millions of embarrassingly parallel arima forecasting models and using dask distributed to help. (Using a custom selection process than AutoArima.)

Dask, of course, is simple to get this going, but, oddly, what seems to be occurring is an inflection point of efficiency when the number of single threaded workers equal half of the CPU cores. For example, on a 14 core machine, four workers is only 2x faster than the sequential single process, and eight workers is only 3x faster than the sequential single process.

The arima modeling package is compiled with numba, and I’m thinking there is some hidden parallelism in itself that could explain the marginal slowness. The computer’s resource monitor doesn’t seem to indicate being at CPU capacity when the inflection occurs.

Below is a reproducible example that should display that doubling the num_workers does not double the performance. I would appreciate any tips to address the bottleneck or any explanation of what is occurring.

import itertools
import os
from time import time
import warnings

import dask
from dask.distributed import Client
# import pandas as pd

warnings.filterwarnings("ignore")
from statsforecast.utils import generate_series
from statsforecast.models import ARIMA

os.environ['NIXTLA_NUMBA_CACHE'] = '1'

num_workers = 4
threads_per_worker = 1
num_data_sets = 10


def model_arima(y_df, all_arima_params):
    # Static model settings
    season = 4
    horizon_validate = 5
    horizon_forecast = 20

    y_trimmed = y_df.iloc[:-5]['y'].values
    all_y = y_df['y'].values

    collect_fit_models = []
    for order, seasonal in all_arima_params:
        arima = ARIMA(order=order,
                      seasonal_order=seasonal,
                      season_length=season,
                      include_constant=True
                      )

        fit = arima.fit(y=y_trimmed)
        validate = arima.forecast(y=y_trimmed, h=horizon_validate, level=[80], fitted=True)
        forecast = arima.forecast(y=all_y, h=horizon_forecast, level=[80], fitted=False)

        collect_fit_models.append((fit.model_, validate, forecast))

    return collect_fit_models


if __name__ == '__main__':
    # Start dask workers
    client = Client(n_workers=num_workers, threads_per_worker=threads_per_worker)
    print(client.dashboard_link)

    # Generate data
    data_sets = generate_series(n_series=num_data_sets, seed=1, freq='QS', min_length=50, max_length=170)
    # models

    # Define parameter ranges
    ar_p = [0, 1, 2]  # , 3, 4]
    i_d = [0, 1, 2]
    ma_q = [0, 1, 2, 3, 4]
    sar_p = [0]  # ,1]
    si_d = [0]  # ,1]
    sma_q = [0]  # ,1]

    all_arima_params = [((p, d, q), (sp, sd, sq)) for (p, d, q, sp, sd, sq) in
                        itertools.product(ar_p, i_d, ma_q, sar_p, si_d, sma_q)]

    num_models = num_data_sets * len(all_arima_params)
    print(f"Models to calc {num_models}")

    # Create list of delayed tasks for workers
    compute_list = []
    for _, data_set in data_sets.groupby('unique_id'):
        compute_list.append(dask.delayed(model_arima)(data_set, all_arima_params))

    # Use dask to compute models
    print("Starting arima modeling")
    st = time()
    out = dask.compute(compute_list)
    duration = time() - st
    print(f"Modeling duration seconds {duration}")

    # More would occur from here....

Hi @ryan, welcome to Dask Discourse forum!

I really don’t know about Arima, and unfortunately I ran into msgpack problems when trying to run your example.

What I would do to better understand what is going on is:

  • Monitor and profile a sequential execution, how the CPU usage and other resources look like?
  • Try to run the simulations with a single Worker process, what overhead does Dask introduce?
  • Try to run two or more batches of simulations in parallel outside of Dask.

Dask is simple and does introduce overhead, depending also on your tasks metrics: typically, how long is one simulation lasting? If it is more than a few seconds, and if each simulation was single threaded, then you should see an almost linear scaling by increasing the Worker number. Are the simulations purely limited by CPU? Do they perform IOs?

I may have got the same msgpack error too, but downgrading to python 3.10 resolved it. Not sure what the true problem is with msgpack, but I believe that to be unrelated to my situation. Indeed an environment.yml would make this more reproducible and I can provide if requested.

I can’t say I fully understand arima either (just the engineer implementing the data scientist code), but I do know the numba part is doing matrix math. Each delayed task is 600 loops and each loop is about 80-500 milliseconds (CPU speed will vary results) depending on the arima parameter.

My hunch is that it is not Dask slowing things down, but something else and that bothers me that I don’t have an explanation. Appreciate you taking the time and offering suggesting of what to look into.