Use Dask for a time step simulation

I have some simulation code written in Python that runs several models for a number of time steps. An initial set of input parameters are given to the models at the first time step. The models use the parameters to generate results. Those results are used to calculate the input parameters for the next time step. The process continues until the final time step. A summary of this process is listed below.

  1. Define initial parameters
  2. Calculate results based on initial parameters
  3. Update parameters based on the results
  4. Calculate results based on the updated parameters
  5. Repeat 3 and 4 until final step

The examples below are simplified versions of the simulation code but they demonstrate the main components of the program. The sleep statements represent areas where some computational overhead would occur in the actual simulation program.

Example 1 (no Dask)

import numpy as np
import time

def calc_params(res: list) -> list:
    time.sleep(1)
    params = []
    for r in res:
        p = r * 1.1
        params.append(p)
    return params

def run_model(p: float) -> float:
    time.sleep(1)
    result = p + 1
    return result

def main():
    tic = time.perf_counter()

    nsteps = 10
    nmodels = 4
    init_params = [5, 4.5, 8, 2]

    # nsteps = 20
    # nmodels = 8
    # init_params = np.random.random(nmodels) * 10

    steps = list(range(nsteps))

    params = np.zeros((nsteps, nmodels))
    params[0] = init_params

    results = np.zeros((nsteps, nmodels))

    for step in steps:

        step_results = []
        for p in params[step]:
            step_results.append(run_model(p))

        results[step] = step_results

        if step < nsteps - 1:
            params[step + 1] = calc_params(step_results)

    toc = time.perf_counter()

    print(f'\nSerial elapsed time {toc - tic:.2f} s\n')
    print(f'Parameters\n{params}\n')
    print(f'Results\n{results}')

if __name__ == '__main__':
    np.set_printoptions(precision=2)
    main()

Example 2 (Dask)

import numpy as np
import time
from dask.distributed import Client, get_client

def calc_params(res: list) -> list:
    time.sleep(1)
    params = []
    for r in res:
        p = r * 1.1
        params.append(p)
    return params

def run_model(p: float) -> float:
    time.sleep(1)
    result = p + 1
    return result

def main():
    tic = time.perf_counter()

    nsteps = 10
    nmodels = 4
    init_params = [5, 4.5, 8, 2]

    # nsteps = 20
    # nmodels = 8
    # init_params = np.random.random(nmodels) * 10

    steps = list(range(nsteps))

    params = np.zeros((nsteps, nmodels))
    params[0] = init_params

    results = np.zeros((nsteps, nmodels))

    client = get_client()

    for step in steps:

        futures = client.map(run_model, params[step])
        step_results = client.gather(futures)

        results[step] = step_results

        if step < nsteps - 1:
            params[step + 1] = calc_params(step_results)

    nresults = np.array(results)

    toc = time.perf_counter()

    print(f'\nDask elapsed time {toc - tic:.2f} s\n')
    print(f'Parameters\n{params}\n')
    print(f'Results\n{nresults}')

if __name__ == '__main__':
    np.set_printoptions(precision=2)

    client = Client(n_workers=8)
    print('\n' + client.dashboard_link)

    main()

    client.close()

On an 8 core CPU, Example 1 takes about 49 seconds to run while Example 2 takes 19 seconds. Using parameters of

nsteps = 20
nmodels = 8
init_params = np.random.random(nmodels) * 10

gives a run time of 179 seconds for Example 1 and 42 seconds for Example 2. So the Dask example is definitely taking advantage of the CPU cores. So my questions are:

  • Is client.map and client.gather the best approach to run the models in parallel?
  • Can I use Dask Array to speed up the execution of the code?
  • Are Dask Actors suitable for this type of problem?
  • Is there a way to run the steps in parallel?

I’m basically wondering if there is a better way to use Dask for this type of problem.

1 Like

Hi! The reason why dask cannot speed this up is because it still needs to compute every step sequentially. If you look at the task graph, you can see that every next step depends on the result of the previous calculation. Dask cannot parallelize anything here and is forced to run one task after the other, just like you are doing when calling a simple for loop.

You can also see this on the dashboard if you go to the “Graph” page

I see the parameter nmodels. If you have nmodels independent calculations, this is something you can easily parallelize

1 Like

The calc_results function represents calculations performed by several models. The purpose of the nmodels variable is to define the number models. This defines the number of columns in the parameters and results arrays. Each item in the parameters list represents an input parameter to a model. That model performs some calculations based on that input and returns the result. So based on your comment, I should parallelize the calc_params and calc_results functions?

I would keep the individual steps in a tight for loop and parallelize over the models. Not parallelize within the calc_results. However, I don’t know about the internals of these functions. My approach would look similar to the code below (modulo bugs and typos)

@delayed
def do_single_model(model_ix, model_param, nsteps=10):
    step_result = None # init
    for n in range(nsteps):
        params = calc_params_single_model(step_results)
        step_result = calc_step_single_model(model_param)
    return step_result

results = []
for m in range(nmodels):
    results.append(do_single_model(m, params[m]))
client.gather(results)

I tried this approach but it’s even slower than my original Dask example for small (4) and large (40) number of models.

Example 3

import numpy as np
import random
import time
from dask import delayed
from dask.distributed import Client, get_client

@delayed
def calc_param(res: float) -> float:
    time.sleep(random.random())
    param = res * 1.1
    return param

@delayed
def calc_result(p: float) -> float:
    time.sleep(random.random())
    result = p + 1
    return result

def main():
    tic = time.perf_counter()

    # nsteps = 10
    # nmodels = 4
    # init_params = [5, 4.5, 8, 2]

    nsteps = 100
    nmodels = 40
    init_params = np.random.random(nmodels) * 10

    results = []
    params = init_params
    steps = list(range(nsteps))

    for step in steps:

        step_results = []
        psnew = []
        for i in range(nmodels):
            p = params[i]
            result = calc_result(p)
            step_results.append(result)

            pnew = calc_param(result)
            psnew.append(pnew)

        results.append(step_results)

        if step < nsteps - 1:
            params = psnew

    client = get_client()
    cresults = client.gather(client.compute(results))
    nresults = np.array(cresults)

    toc = time.perf_counter()

    print(f'\nElapsed time {toc - tic:.2f} s\n')
    # print(f'Parameters\n{params}\n')
    print(f'Results\n{nresults}')

if __name__ == '__main__':
    np.set_printoptions(precision=2)

    client = Client(n_workers=8)
    print('\n' + client.dashboard_link)

    main()

    client.close()

I said something to this effect on Slack before you opened this topic, but repeating here:

I think it’s worth thinking about what the best case scenario is given perfect parallelism. You are time.sleep() ing for a random amount between zero and one second, so an average of 0.5 seconds. So each time step, which has a calc_param and a calc_result take an average of one second. For a 100-step simulation, that works out to 100 seconds per model run.

So if I have a cluster with 8 workers and am running 40 models, I’d expect the amount of time to be 1s * n_time_steps * n_models / n_workers = 500 seconds. And when I run your code, that’s exactly what I get (to be more precise, I got 504.99s). So that all makes sense!

So why did your initial work run so much faster? That’s because each time step is sharing the same time.sleep()s, so you are actually sleeping 2*nsteps times, when I think you intended to be sleeping 2*nsteps*nmodels times. TL;DR, I don’t think your non-Dask model is quite measuring what you wanted, and the Dask one is showing expected results given the embarrassingly-parallel model runs.

I updated my posts with labels for the examples. This should make it easier to clarify which code example you are referring to. I also added a table of elapsed times in the original post.

In reply to your comment…
Each model should calculate its result at each time step. The updated input parameters for all the models can be calculated once at each time step. Assume it takes 1 second for each model to calculate its result at each time step. And assume it takes 1 second to update all the input parameters parameters at a time step. Therefore the time taken at each time step would be
1 s * nmodels + 1 s. But I don’t think that is what Example 1 is conveying.

I suppose what I am trying to say is that your Example 3 already exhibits embarrassingly parallel scaling. If the models are being executed independently of each other, then each one should be taking about 1s * nsteps, and there is no way to bring the individual run time down. But by parallelizing across model runs, the wall clock time can come down by a factor of n_workers.

Put another way: your sleep() statements are stand-ins for simulation work. What is the total amount of “work” time you expect? For your Example 3, it should be ~4000 seconds. If each of your eight workers is spending 100% of the time doing useful work, the computation will be done in 4000/8=500 seconds (not ~100 as I think you are suggesting)

I’d suggest organizing it in the way that @fjetter is suggesting, keeping the individual runs more isolated, which I think would help in describing the problem.

1 Like

@Ian and @fjetter I updated my original post and the original code examples. Sorry for all the changes but hopefully it better conveys what I’m trying to do.

@fjetter Based on Example 1 and Example 2, can you provide a working example of the approach you suggested?

Can you describe what changes you made? It can be difficult to follow this conversation when the original example snippets are changing, and our earlier comments may or may not still apply.

To your (new) questions:

It’s an okay approach, but I’d note that by calling client.gather at every time step you are forcing the individual models to synchronize at every step. Since they are separate from each other, this could cause significant slowdown if some of them finish much earlier than others, and are then forced to wait on their counterparts. I’ll second what @fjetter suggested: try to set up your models so that they are fully independent of each other, that way they won’t block each other.

Maybe, but it really depends on your specific model solves (represented by sleeps right now). I don’t think you should be trying to force your set of model runs into an Array, this would be for computations within the runs.

Probably not – Dask actors are still a somewhat experimental tech and may not be supported in the future. I don’t think this problem requires them.

I’m not sure exactly what you mean here, but I think the best think to do for your setup is to parallelize across model runs, and letting the steps proceed as fast as they can.

I don’t understand how I can parallelize the model runs. The calc_param() uses the results from all of the models to update all the input parameters at each time step. So if the models are fully independent how would I use calc_params() to update the time step parameters?

Are the parameters for each model in calc_params independent of each other, or do the models depend on each other? Based on my reading of your sample code, the models are independent, so it should be possible to calc_params on a per-model basis.

In the original simulation code, the models are not fully independent from each other. The results from all the models are used to update the input parameters for the next time step. In Example 1, I tried to demonstrate this with calc_params() which takes a list of results instead of a single result value.

If they are not independent of each other, then using your map/gather approach seems reasonable (though if they aren’t independent of each other, I question how they are considered several different models?).

Overall, I would probably recommend doing something similar to your third example to build the whole graph using dask.delayed, which would force a more explicit dependency structure of your computation.

Ok, I’ll try to make a dask.delayed version of Example 1.

Example 4

Here’s my attempt at using dask.delayed for Example 1. I get the same performance as Example 2 which is probably because it computes the results at each time step. If anyone knows how to make a better dask.delayed version then please post an example.

import numpy as np
import time
from dask import delayed
from dask.distributed import Client, get_client

def calc_params(res: list) -> list:
    time.sleep(1)
    params = []
    for r in res:
        p = r * 1.1
        params.append(p)
    return params

@delayed
def run_model(p: float) -> float:
    time.sleep(1)
    result = p + 1
    return result

def main():
    tic = time.perf_counter()

    nsteps = 10
    nmodels = 4
    init_params = [5, 4.5, 8, 2]

    # nsteps = 20
    # nmodels = 8
    # init_params = np.random.random(nmodels) * 10

    params = np.zeros((nsteps, nmodels))
    params[0] = init_params

    results = np.zeros((nsteps, nmodels))
    client = get_client()

    for step in range(nsteps):

        futures = []
        for p in params[step]:
            result = run_model(p)
            futures.append(result)
        step_results = client.gather(client.compute(futures))

        results[step] = step_results

        if step < nsteps - 1:
            params[step + 1] = calc_params(step_results)

    toc = time.perf_counter()

    print(f'\nSerial elapsed time {toc - tic:.2f} s\n')
    print(f'Parameters\n{params}\n')
    print(f'Results\n{results}')

if __name__ == '__main__':
    np.set_printoptions(precision=2)

    client = Client(n_workers=8)
    print('\n' + client.dashboard_link)

    main()

    client.close()

defining both calc_params and run_model as delayed, the following code parallelizes over the model runs and synchronizes afterwards to calc new parameters. See graph below.
fwiw, this triggers a bug in our dashboard so you will see a lot of warnings about a cyclic reference. I haven’t looked into this, yet, but the computation works and it should not raise a cyclic ref warning imo

for step in range(nsteps):

    futures = []
    for ix in range(len(init_params)):
        result = run_model(params[ix])
        futures.append(result)
    step_results = futures

    results.append(step_results)

    params = calc_params(step_results)

Based on everyone’s feedback, I compared three Dask approaches to the non-Dask example. The example codes are given below along with a comparison of the elapsed times using an 8 CPU core machine. The client.map and client.gather approach (Example 2) seems to give the best time. I’m open to suggestions for improvement but I think I have some good ideas on how to move forward.

Example 1 (serial, no Dask)

import numpy as np
import time

def calc_params(res: list) -> list:
    time.sleep(1)
    params = []
    for r in res:
        p = r * 1.1
        params.append(p)
    return params

def run_model(p: float) -> float:
    time.sleep(1)
    result = p + 1
    return result

def main():
    tic = time.perf_counter()

    nsteps = 20
    nmodels = 16
    init_params = np.random.random(nmodels) * 10

    steps = list(range(nsteps))

    params = np.zeros((nsteps, nmodels))
    params[0] = init_params

    results = np.zeros((nsteps, nmodels))

    for step in steps:

        step_results = []
        for p in params[step]:
            step_results.append(run_model(p))

        results[step] = step_results

        if step < nsteps - 1:
            params[step + 1] = calc_params(step_results)

    toc = time.perf_counter()

    print(f'\nSerial elapsed time {toc - tic:.2f} s\n')
    print(f'Parameters\n{params}\n')
    print(f'Results\n{results}')

if __name__ == '__main__':
    np.set_printoptions(precision=2)
    main()

Example 2 (Dask, client.map, client.gather)

import numpy as np
import time
from dask.distributed import Client, get_client

def calc_params(res: list) -> list:
    time.sleep(1)
    params = []
    for r in res:
        p = r * 1.1
        params.append(p)
    return params

def run_model(p: float) -> float:
    time.sleep(1)
    result = p + 1
    return result

def main():
    tic = time.perf_counter()

    nsteps = 20
    nmodels = 16
    init_params = np.random.random(nmodels) * 10

    steps = list(range(nsteps))

    params = np.zeros((nsteps, nmodels))
    params[0] = init_params

    results = np.zeros((nsteps, nmodels))

    client = get_client()

    for step in steps:

        futures = client.map(run_model, params[step])
        step_results = client.gather(futures)

        results[step] = step_results

        if step < nsteps - 1:
            params[step + 1] = calc_params(step_results)

    nresults = np.array(results)

    toc = time.perf_counter()

    print(f'\nDask elapsed time {toc - tic:.2f} s\n')
    print(f'Parameters\n{params}\n')
    print(f'Results\n{nresults}')

if __name__ == '__main__':
    np.set_printoptions(precision=2)

    client = Client(n_workers=8)
    print('\n' + client.dashboard_link)

    main()

    client.close()

Example 3 (Dask, map_blocks)

import numpy as np
import time
import dask.array as da
from dask.distributed import Client

def calc_params(res: list) -> list:
    time.sleep(1)
    params = []
    for r in res:
        p = r * 1.1
        params.append(p)
    return params

def run_model(p: float) -> float:
    time.sleep(1)
    result = p + 1
    return result

def main():
    tic = time.perf_counter()

    nsteps = 20
    nmodels = 16
    init_params = np.random.random(nmodels) * 10

    steps = list(range(nsteps))

    params = np.zeros((nsteps, nmodels))
    params[0] = init_params

    results = np.zeros((nsteps, nmodels))

    c = 1 if nmodels // 8 == 0 else nmodels // 8

    for step in steps:

        ps = da.from_array(params[step], chunks=c)
        futures = da.map_blocks(run_model, ps)
        step_results = futures.compute()

        results[step] = step_results

        if step < nsteps - 1:
            params[step + 1] = calc_params(step_results)

    nresults = np.array(results)

    toc = time.perf_counter()

    print(f'\nDask elapsed time {toc - tic:.2f} s\n')
    print(f'Parameters\n{params}\n')
    print(f'Results\n{nresults}')

if __name__ == '__main__':
    np.set_printoptions(precision=2)

    client = Client(n_workers=8)
    print('\n' + client.dashboard_link)

    main()

    client.close()

Example 4 (Dask, delayed)

import numpy as np
import time
from dask import delayed
from dask.distributed import Client, get_client

def calc_params(res: list) -> list:
    time.sleep(1)
    params = []
    for r in res:
        p = r * 1.1
        params.append(p)
    return params

def run_model(p: float) -> float:
    time.sleep(1)
    result = p + 1
    return result

def main():
    tic = time.perf_counter()

    nsteps = 20
    nmodels = 16
    init_params = np.random.random(nmodels) * 10

    params = []
    params.append(init_params)

    results = []

    for step in range(nsteps):
        step_results = []

        for m in range(nmodels):
            result = delayed(run_model)(params[step][m])
            step_results.append(result)

        results.append(step_results)

        if step < nsteps - 1:
            params.append(delayed(calc_params)(step_results))

    client = get_client()
    y = client.gather(client.compute([params, results]))

    params, results = zip(y)
    params = np.array(params)
    results = np.array(results)

    toc = time.perf_counter()

    print(f'\nSerial elapsed time {toc - tic:.2f} s\n')
    print(f'Parameters\n{params}\n')
    print(f'Results\n{results}')

if __name__ == '__main__':
    np.set_printoptions(precision=2)

    client = Client(n_workers=8)
    print('\n' + client.dashboard_link)

    main()

    client.close()

Comparison of examples

Parameters:

nsteps = 20
nmodels = 16
init_params = np.random.random(nmodels) * 10
n_workers = 8

Elapsed times:

Example Description Elapsed time (s)
1 serial, no Dask 339.95
2 Dask client.map, client.gather 43.49
3 Dask map_blocks 79.99
4 Dask delayed 56.55

The differences between your 2 and 4 should not be that large. I would expect this difference to vanish if you are doing multiple measurements. While you are using a different API, the two examples are functionally almost equivalent. The exception is the call to calc_params since Example 2 calculates it in the main process (not using the client) and Example 4 wraps it in a delayed and therefore calculates it on a cluster process. I typically recommend doing all heavy computations on the cluster instead of the main process. Example 4 is also preferred if you ever move to a distributed cluster since it does not require the local machine to participate in the computation which has benefits your micro benchmarks don’t capture (e.g. network).

A final change I would recommend looking into is a variation of Example 4. I see you are using

    client = get_client()
    y = client.gather(client.compute([params, results]))

which can instead be expressed by

import dask
# client = get_client()  # This is optional
y = dask.compute([params, results])

The benefit of the latter is that it can be run even without a Client. Not using the client will cause dask to use a very simple threadpool. If a client is available, it is used instead. This typically gives a little more freedom and makes writing tests, etc. a bit easier. For instance you can do dask.compute([params, results], scheduler='sync') to execute everything in the same process for debugging.
For running the actual computation, you should use a client/cluster, though. Simply initiate the client and dask will detect and use it automatically

Example 4b is based on the suggestion by @fjetter. I ran the examples again to check for consistency (see table below). As expected, the results are similar with the previous runs.

Example 4b (Dask, delayed)

import numpy as np
import time
import dask
from dask.distributed import Client

@dask.delayed
def calc_params(res: list) -> list:
    time.sleep(1)
    params = []
    for r in res:
        p = r * 1.1
        params.append(p)
    return params

@dask.delayed
def run_model(p: float) -> float:
    time.sleep(1)
    result = p + 1
    return result

def main():
    tic = time.perf_counter()

    nsteps = 20
    nmodels = 16
    init_params = np.random.random(nmodels) * 10

    params = []
    params.append(init_params)

    results = []

    for step in range(nsteps):
        step_results = []

        for m in range(nmodels):
            p = params[step][m]
            result = run_model(p)
            step_results.append(result)

        results.append(step_results)

        if step < nsteps - 1:
            ps = calc_params(step_results)
            params.append(ps)

    y = dask.compute([params, results])

    params, results = zip(*y)
    params = np.array(params[0])
    results = np.array(results[0])

    toc = time.perf_counter()

    print(f'\nDask elapsed time {toc - tic:.2f} s\n')
    print(f'Parameters\n{params}\n')
    print(f'Results\n{results}')

if __name__ == '__main__':
    np.set_printoptions(precision=2)

    client = Client(n_workers=8)
    print('\n' + client.dashboard_link)

    main()

    client.close()

Comparison of examples

Parameters:

nsteps = 20
nmodels = 16
init_params = np.random.random(nmodels) * 10
n_workers = 8

Elapsed times:

Example Description Elapsed time (s)
1 serial, no Dask 340.05 s
2 Dask client.map, client.gather 43.10 s
3 Dask map_blocks 79.99 s
4 Dask delayed 60.24 s
4b Dask delayed 61.30 s