I have some simulation code written in Python that runs several models for a number of time steps. An initial set of input parameters are given to the models at the first time step. The models use the parameters to generate results. Those results are used to calculate the input parameters for the next time step. The process continues until the final time step. A summary of this process is listed below.

- Define initial parameters
- Calculate results based on initial parameters
- Update parameters based on the results
- Calculate results based on the updated parameters
- Repeat 3 and 4 until final step

The examples below are simplified versions of the simulation code but they demonstrate the main components of the program. The sleep statements represent areas where some computational overhead would occur in the actual simulation program.

## Example 1 (no Dask)

```
import numpy as np
import time
def calc_params(res: list) -> list:
time.sleep(1)
params = []
for r in res:
p = r * 1.1
params.append(p)
return params
def run_model(p: float) -> float:
time.sleep(1)
result = p + 1
return result
def main():
tic = time.perf_counter()
nsteps = 10
nmodels = 4
init_params = [5, 4.5, 8, 2]
# nsteps = 20
# nmodels = 8
# init_params = np.random.random(nmodels) * 10
steps = list(range(nsteps))
params = np.zeros((nsteps, nmodels))
params[0] = init_params
results = np.zeros((nsteps, nmodels))
for step in steps:
step_results = []
for p in params[step]:
step_results.append(run_model(p))
results[step] = step_results
if step < nsteps - 1:
params[step + 1] = calc_params(step_results)
toc = time.perf_counter()
print(f'\nSerial elapsed time {toc - tic:.2f} s\n')
print(f'Parameters\n{params}\n')
print(f'Results\n{results}')
if __name__ == '__main__':
np.set_printoptions(precision=2)
main()
```

## Example 2 (Dask)

```
import numpy as np
import time
from dask.distributed import Client, get_client
def calc_params(res: list) -> list:
time.sleep(1)
params = []
for r in res:
p = r * 1.1
params.append(p)
return params
def run_model(p: float) -> float:
time.sleep(1)
result = p + 1
return result
def main():
tic = time.perf_counter()
nsteps = 10
nmodels = 4
init_params = [5, 4.5, 8, 2]
# nsteps = 20
# nmodels = 8
# init_params = np.random.random(nmodels) * 10
steps = list(range(nsteps))
params = np.zeros((nsteps, nmodels))
params[0] = init_params
results = np.zeros((nsteps, nmodels))
client = get_client()
for step in steps:
futures = client.map(run_model, params[step])
step_results = client.gather(futures)
results[step] = step_results
if step < nsteps - 1:
params[step + 1] = calc_params(step_results)
nresults = np.array(results)
toc = time.perf_counter()
print(f'\nDask elapsed time {toc - tic:.2f} s\n')
print(f'Parameters\n{params}\n')
print(f'Results\n{nresults}')
if __name__ == '__main__':
np.set_printoptions(precision=2)
client = Client(n_workers=8)
print('\n' + client.dashboard_link)
main()
client.close()
```

On an 8 core CPU, Example 1 takes about 49 seconds to run while Example 2 takes 19 seconds. Using parameters of

```
nsteps = 20
nmodels = 8
init_params = np.random.random(nmodels) * 10
```

gives a run time of 179 seconds for Example 1 and 42 seconds for Example 2. So the Dask example is definitely taking advantage of the CPU cores. So my questions are:

- Is
`client.map`

and`client.gather`

the best approach to run the models in parallel? - Can I use Dask Array to speed up the execution of the code?
- Are Dask Actors suitable for this type of problem?
- Is there a way to run the steps in parallel?

Iâ€™m basically wondering if there is a better way to use Dask for this type of problem.