# Use Dask for a time step simulation

I have some simulation code written in Python that runs several models for a number of time steps. An initial set of input parameters are given to the models at the first time step. The models use the parameters to generate results. Those results are used to calculate the input parameters for the next time step. The process continues until the final time step. A summary of this process is listed below.

1. Define initial parameters
2. Calculate results based on initial parameters
3. Update parameters based on the results
4. Calculate results based on the updated parameters
5. Repeat 3 and 4 until final step

The examples below are simplified versions of the simulation code but they demonstrate the main components of the program. The sleep statements represent areas where some computational overhead would occur in the actual simulation program.

``````import numpy as np
import time

def calc_params(res: list) -> list:
time.sleep(1)
params = []
for r in res:
p = r * 1.1
params.append(p)
return params

def run_model(p: float) -> float:
time.sleep(1)
result = p + 1
return result

def main():
tic = time.perf_counter()

nsteps = 10
nmodels = 4
init_params = [5, 4.5, 8, 2]

# nsteps = 20
# nmodels = 8
# init_params = np.random.random(nmodels) * 10

steps = list(range(nsteps))

params = np.zeros((nsteps, nmodels))
params = init_params

results = np.zeros((nsteps, nmodels))

for step in steps:

step_results = []
for p in params[step]:
step_results.append(run_model(p))

results[step] = step_results

if step < nsteps - 1:
params[step + 1] = calc_params(step_results)

toc = time.perf_counter()

print(f'\nSerial elapsed time {toc - tic:.2f} s\n')
print(f'Parameters\n{params}\n')
print(f'Results\n{results}')

if __name__ == '__main__':
np.set_printoptions(precision=2)
main()
``````

``````import numpy as np
import time

def calc_params(res: list) -> list:
time.sleep(1)
params = []
for r in res:
p = r * 1.1
params.append(p)
return params

def run_model(p: float) -> float:
time.sleep(1)
result = p + 1
return result

def main():
tic = time.perf_counter()

nsteps = 10
nmodels = 4
init_params = [5, 4.5, 8, 2]

# nsteps = 20
# nmodels = 8
# init_params = np.random.random(nmodels) * 10

steps = list(range(nsteps))

params = np.zeros((nsteps, nmodels))
params = init_params

results = np.zeros((nsteps, nmodels))

client = get_client()

for step in steps:

futures = client.map(run_model, params[step])
step_results = client.gather(futures)

results[step] = step_results

if step < nsteps - 1:
params[step + 1] = calc_params(step_results)

nresults = np.array(results)

toc = time.perf_counter()

print(f'\nDask elapsed time {toc - tic:.2f} s\n')
print(f'Parameters\n{params}\n')
print(f'Results\n{nresults}')

if __name__ == '__main__':
np.set_printoptions(precision=2)

client = Client(n_workers=8)

main()

client.close()
``````

On an 8 core CPU, Example 1 takes about 49 seconds to run while Example 2 takes 19 seconds. Using parameters of

``````nsteps = 20
nmodels = 8
init_params = np.random.random(nmodels) * 10
``````

gives a run time of 179 seconds for Example 1 and 42 seconds for Example 2. So the Dask example is definitely taking advantage of the CPU cores. So my questions are:

• Is `client.map` and `client.gather` the best approach to run the models in parallel?
• Can I use Dask Array to speed up the execution of the code?
• Are Dask Actors suitable for this type of problem?
• Is there a way to run the steps in parallel?

I’m basically wondering if there is a better way to use Dask for this type of problem.

1 Like

Hi! The reason why dask cannot speed this up is because it still needs to compute every step sequentially. If you look at the task graph, you can see that every next step depends on the result of the previous calculation. Dask cannot parallelize anything here and is forced to run one task after the other, just like you are doing when calling a simple for loop.

You can also see this on the dashboard if you go to the “Graph” page

I see the parameter nmodels. If you have nmodels independent calculations, this is something you can easily parallelize

1 Like

The `calc_results` function represents calculations performed by several models. The purpose of the `nmodels` variable is to define the number models. This defines the number of columns in the parameters and results arrays. Each item in the parameters list represents an input parameter to a model. That model performs some calculations based on that input and returns the result. So based on your comment, I should parallelize the `calc_params` and `calc_results` functions?

I would keep the individual steps in a tight for loop and parallelize over the models. Not parallelize within the calc_results. However, I don’t know about the internals of these functions. My approach would look similar to the code below (modulo bugs and typos)

``````@delayed
def do_single_model(model_ix, model_param, nsteps=10):
step_result = None # init
for n in range(nsteps):
params = calc_params_single_model(step_results)
step_result = calc_step_single_model(model_param)
return step_result

results = []
for m in range(nmodels):
results.append(do_single_model(m, params[m]))
client.gather(results)
``````

I tried this approach but it’s even slower than my original Dask example for small (4) and large (40) number of models.

## Example 3

``````import numpy as np
import random
import time

@delayed
def calc_param(res: float) -> float:
time.sleep(random.random())
param = res * 1.1
return param

@delayed
def calc_result(p: float) -> float:
time.sleep(random.random())
result = p + 1
return result

def main():
tic = time.perf_counter()

# nsteps = 10
# nmodels = 4
# init_params = [5, 4.5, 8, 2]

nsteps = 100
nmodels = 40
init_params = np.random.random(nmodels) * 10

results = []
params = init_params
steps = list(range(nsteps))

for step in steps:

step_results = []
psnew = []
for i in range(nmodels):
p = params[i]
result = calc_result(p)
step_results.append(result)

pnew = calc_param(result)
psnew.append(pnew)

results.append(step_results)

if step < nsteps - 1:
params = psnew

client = get_client()
cresults = client.gather(client.compute(results))
nresults = np.array(cresults)

toc = time.perf_counter()

print(f'\nElapsed time {toc - tic:.2f} s\n')
# print(f'Parameters\n{params}\n')
print(f'Results\n{nresults}')

if __name__ == '__main__':
np.set_printoptions(precision=2)

client = Client(n_workers=8)

main()

client.close()
``````

I said something to this effect on Slack before you opened this topic, but repeating here:

I think it’s worth thinking about what the best case scenario is given perfect parallelism. You are `time.sleep()` ing for a random amount between zero and one second, so an average of 0.5 seconds. So each time step, which has a `calc_param` and a `calc_result` take an average of one second. For a 100-step simulation, that works out to 100 seconds per model run.

So if I have a cluster with 8 workers and am running 40 models, I’d expect the amount of time to be `1s * n_time_steps * n_models / n_workers` = 500 seconds. And when I run your code, that’s exactly what I get (to be more precise, I got `504.99s`). So that all makes sense!

So why did your initial work run so much faster? That’s because each time step is sharing the same `time.sleep()`s, so you are actually `sleep`ing `2*nsteps` times, when I think you intended to be `sleep`ing `2*nsteps*nmodels` times. TL;DR, I don’t think your non-Dask model is quite measuring what you wanted, and the Dask one is showing expected results given the embarrassingly-parallel model runs.

I updated my posts with labels for the examples. This should make it easier to clarify which code example you are referring to. I also added a table of elapsed times in the original post.

Each model should calculate its result at each time step. The updated input parameters for all the models can be calculated once at each time step. Assume it takes 1 second for each model to calculate its result at each time step. And assume it takes 1 second to update all the input parameters parameters at a time step. Therefore the time taken at each time step would be
`1 s * nmodels + 1 s`. But I don’t think that is what Example 1 is conveying.

I suppose what I am trying to say is that your Example 3 already exhibits embarrassingly parallel scaling. If the models are being executed independently of each other, then each one should be taking about `1s * nsteps`, and there is no way to bring the individual run time down. But by parallelizing across model runs, the wall clock time can come down by a factor of `n_workers`.

Put another way: your `sleep()` statements are stand-ins for simulation work. What is the total amount of “work” time you expect? For your Example 3, it should be ~4000 seconds. If each of your eight workers is spending 100% of the time doing useful work, the computation will be done in 4000/8=500 seconds (not ~100 as I think you are suggesting)

I’d suggest organizing it in the way that @fjetter is suggesting, keeping the individual runs more isolated, which I think would help in describing the problem.

1 Like

@Ian and @fjetter I updated my original post and the original code examples. Sorry for all the changes but hopefully it better conveys what I’m trying to do.

@fjetter Based on Example 1 and Example 2, can you provide a working example of the approach you suggested?

Can you describe what changes you made? It can be difficult to follow this conversation when the original example snippets are changing, and our earlier comments may or may not still apply.

It’s an okay approach, but I’d note that by calling `client.gather` at every time step you are forcing the individual models to synchronize at every step. Since they are separate from each other, this could cause significant slowdown if some of them finish much earlier than others, and are then forced to wait on their counterparts. I’ll second what @fjetter suggested: try to set up your models so that they are fully independent of each other, that way they won’t block each other.

Maybe, but it really depends on your specific model solves (represented by sleeps right now). I don’t think you should be trying to force your set of model runs into an Array, this would be for computations within the runs.

Probably not – Dask actors are still a somewhat experimental tech and may not be supported in the future. I don’t think this problem requires them.

I’m not sure exactly what you mean here, but I think the best think to do for your setup is to parallelize across model runs, and letting the steps proceed as fast as they can.

I don’t understand how I can parallelize the model runs. The `calc_param()` uses the results from all of the models to update all the input parameters at each time step. So if the models are fully independent how would I use `calc_params()` to update the time step parameters?

Are the parameters for each model in `calc_params` independent of each other, or do the models depend on each other? Based on my reading of your sample code, the models are independent, so it should be possible to `calc_params` on a per-model basis.

In the original simulation code, the models are not fully independent from each other. The results from all the models are used to update the input parameters for the next time step. In Example 1, I tried to demonstrate this with `calc_params()` which takes a list of results instead of a single result value.

If they are not independent of each other, then using your `map`/`gather` approach seems reasonable (though if they aren’t independent of each other, I question how they are considered several different models?).

Overall, I would probably recommend doing something similar to your third example to build the whole graph using `dask.delayed`, which would force a more explicit dependency structure of your computation.

Ok, I’ll try to make a `dask.delayed` version of Example 1.

## Example 4

Here’s my attempt at using `dask.delayed` for Example 1. I get the same performance as Example 2 which is probably because it computes the results at each time step. If anyone knows how to make a better `dask.delayed` version then please post an example.

``````import numpy as np
import time

def calc_params(res: list) -> list:
time.sleep(1)
params = []
for r in res:
p = r * 1.1
params.append(p)
return params

@delayed
def run_model(p: float) -> float:
time.sleep(1)
result = p + 1
return result

def main():
tic = time.perf_counter()

nsteps = 10
nmodels = 4
init_params = [5, 4.5, 8, 2]

# nsteps = 20
# nmodels = 8
# init_params = np.random.random(nmodels) * 10

params = np.zeros((nsteps, nmodels))
params = init_params

results = np.zeros((nsteps, nmodels))
client = get_client()

for step in range(nsteps):

futures = []
for p in params[step]:
result = run_model(p)
futures.append(result)
step_results = client.gather(client.compute(futures))

results[step] = step_results

if step < nsteps - 1:
params[step + 1] = calc_params(step_results)

toc = time.perf_counter()

print(f'\nSerial elapsed time {toc - tic:.2f} s\n')
print(f'Parameters\n{params}\n')
print(f'Results\n{results}')

if __name__ == '__main__':
np.set_printoptions(precision=2)

client = Client(n_workers=8)

main()

client.close()
``````

defining both calc_params and run_model as delayed, the following code parallelizes over the model runs and synchronizes afterwards to calc new parameters. See graph below.
fwiw, this triggers a bug in our dashboard so you will see a lot of warnings about a cyclic reference. I haven’t looked into this, yet, but the computation works and it should not raise a cyclic ref warning imo

``````for step in range(nsteps):

futures = []
for ix in range(len(init_params)):
result = run_model(params[ix])
futures.append(result)
step_results = futures

results.append(step_results)

params = calc_params(step_results)
``````

Based on everyone’s feedback, I compared three Dask approaches to the non-Dask example. The example codes are given below along with a comparison of the elapsed times using an 8 CPU core machine. The `client.map` and `client.gather` approach (Example 2) seems to give the best time. I’m open to suggestions for improvement but I think I have some good ideas on how to move forward.

## Example 1 (serial, no Dask)

``````import numpy as np
import time

def calc_params(res: list) -> list:
time.sleep(1)
params = []
for r in res:
p = r * 1.1
params.append(p)
return params

def run_model(p: float) -> float:
time.sleep(1)
result = p + 1
return result

def main():
tic = time.perf_counter()

nsteps = 20
nmodels = 16
init_params = np.random.random(nmodels) * 10

steps = list(range(nsteps))

params = np.zeros((nsteps, nmodels))
params = init_params

results = np.zeros((nsteps, nmodels))

for step in steps:

step_results = []
for p in params[step]:
step_results.append(run_model(p))

results[step] = step_results

if step < nsteps - 1:
params[step + 1] = calc_params(step_results)

toc = time.perf_counter()

print(f'\nSerial elapsed time {toc - tic:.2f} s\n')
print(f'Parameters\n{params}\n')
print(f'Results\n{results}')

if __name__ == '__main__':
np.set_printoptions(precision=2)
main()
``````

## Example 2 (Dask, client.map, client.gather)

``````import numpy as np
import time

def calc_params(res: list) -> list:
time.sleep(1)
params = []
for r in res:
p = r * 1.1
params.append(p)
return params

def run_model(p: float) -> float:
time.sleep(1)
result = p + 1
return result

def main():
tic = time.perf_counter()

nsteps = 20
nmodels = 16
init_params = np.random.random(nmodels) * 10

steps = list(range(nsteps))

params = np.zeros((nsteps, nmodels))
params = init_params

results = np.zeros((nsteps, nmodels))

client = get_client()

for step in steps:

futures = client.map(run_model, params[step])
step_results = client.gather(futures)

results[step] = step_results

if step < nsteps - 1:
params[step + 1] = calc_params(step_results)

nresults = np.array(results)

toc = time.perf_counter()

print(f'\nDask elapsed time {toc - tic:.2f} s\n')
print(f'Parameters\n{params}\n')
print(f'Results\n{nresults}')

if __name__ == '__main__':
np.set_printoptions(precision=2)

client = Client(n_workers=8)

main()

client.close()
``````

``````import numpy as np
import time

def calc_params(res: list) -> list:
time.sleep(1)
params = []
for r in res:
p = r * 1.1
params.append(p)
return params

def run_model(p: float) -> float:
time.sleep(1)
result = p + 1
return result

def main():
tic = time.perf_counter()

nsteps = 20
nmodels = 16
init_params = np.random.random(nmodels) * 10

steps = list(range(nsteps))

params = np.zeros((nsteps, nmodels))
params = init_params

results = np.zeros((nsteps, nmodels))

c = 1 if nmodels // 8 == 0 else nmodels // 8

for step in steps:

ps = da.from_array(params[step], chunks=c)
futures = da.map_blocks(run_model, ps)
step_results = futures.compute()

results[step] = step_results

if step < nsteps - 1:
params[step + 1] = calc_params(step_results)

nresults = np.array(results)

toc = time.perf_counter()

print(f'\nDask elapsed time {toc - tic:.2f} s\n')
print(f'Parameters\n{params}\n')
print(f'Results\n{nresults}')

if __name__ == '__main__':
np.set_printoptions(precision=2)

client = Client(n_workers=8)

main()

client.close()
``````

``````import numpy as np
import time

def calc_params(res: list) -> list:
time.sleep(1)
params = []
for r in res:
p = r * 1.1
params.append(p)
return params

def run_model(p: float) -> float:
time.sleep(1)
result = p + 1
return result

def main():
tic = time.perf_counter()

nsteps = 20
nmodels = 16
init_params = np.random.random(nmodels) * 10

params = []
params.append(init_params)

results = []

for step in range(nsteps):
step_results = []

for m in range(nmodels):
result = delayed(run_model)(params[step][m])
step_results.append(result)

results.append(step_results)

if step < nsteps - 1:
params.append(delayed(calc_params)(step_results))

client = get_client()
y = client.gather(client.compute([params, results]))

params, results = zip(y)
params = np.array(params)
results = np.array(results)

toc = time.perf_counter()

print(f'\nSerial elapsed time {toc - tic:.2f} s\n')
print(f'Parameters\n{params}\n')
print(f'Results\n{results}')

if __name__ == '__main__':
np.set_printoptions(precision=2)

client = Client(n_workers=8)

main()

client.close()
``````

## Comparison of examples

Parameters:

``````nsteps = 20
nmodels = 16
init_params = np.random.random(nmodels) * 10
n_workers = 8
``````

Elapsed times:

Example Description Elapsed time (s)

The differences between your 2 and 4 should not be that large. I would expect this difference to vanish if you are doing multiple measurements. While you are using a different API, the two examples are functionally almost equivalent. The exception is the call to `calc_params` since Example 2 calculates it in the main process (not using the client) and Example 4 wraps it in a delayed and therefore calculates it on a cluster process. I typically recommend doing all heavy computations on the cluster instead of the main process. Example 4 is also preferred if you ever move to a distributed cluster since it does not require the local machine to participate in the computation which has benefits your micro benchmarks don’t capture (e.g. network).

A final change I would recommend looking into is a variation of Example 4. I see you are using

``````    client = get_client()
y = client.gather(client.compute([params, results]))
``````

which can instead be expressed by

``````import dask
# client = get_client()  # This is optional
``````

The benefit of the latter is that it can be run even without a `Client`. Not using the client will cause dask to use a very simple threadpool. If a client is available, it is used instead. This typically gives a little more freedom and makes writing tests, etc. a bit easier. For instance you can do `dask.compute([params, results], scheduler='sync')` to execute everything in the same process for debugging.
For running the actual computation, you should use a client/cluster, though. Simply initiate the client and dask will detect and use it automatically

Example 4b is based on the suggestion by @fjetter. I ran the examples again to check for consistency (see table below). As expected, the results are similar with the previous runs.

``````import numpy as np
import time

def calc_params(res: list) -> list:
time.sleep(1)
params = []
for r in res:
p = r * 1.1
params.append(p)
return params

def run_model(p: float) -> float:
time.sleep(1)
result = p + 1
return result

def main():
tic = time.perf_counter()

nsteps = 20
nmodels = 16
init_params = np.random.random(nmodels) * 10

params = []
params.append(init_params)

results = []

for step in range(nsteps):
step_results = []

for m in range(nmodels):
p = params[step][m]
result = run_model(p)
step_results.append(result)

results.append(step_results)

if step < nsteps - 1:
ps = calc_params(step_results)
params.append(ps)

params, results = zip(*y)
params = np.array(params)
results = np.array(results)

toc = time.perf_counter()

print(f'\nDask elapsed time {toc - tic:.2f} s\n')
print(f'Parameters\n{params}\n')
print(f'Results\n{results}')

if __name__ == '__main__':
np.set_printoptions(precision=2)

client = Client(n_workers=8)

main()

client.close()
``````

## Comparison of examples

Parameters:

``````nsteps = 20
nmodels = 16
init_params = np.random.random(nmodels) * 10
n_workers = 8
``````

Elapsed times:

Example Description Elapsed time (s)
1 serial, no Dask 340.05 s
2 Dask client.map, client.gather 43.10 s