Use dask to run many simulations with external tool in parallel in chunks

Hi,

I am very new to dask and used tools like scoop for simple sampling or for optimization with deap before.

With the setup I could run easily multiple CFD simulations in parallel (on one node with predefined number of cores and also distributed over multiple cores with predefined number of cores on each one of them; this was without anything like slurm).

These simulations were created step by step, which means, that when I defined to run 4 cases in parallel, 4 cases were created in 4 different files. And when the simulation was done, the next 4 were copied and started afterwards.

Now I would like to switch from e.g. scoop to dask and have at the moment the current setup. The function with does all the work like creating new cases with own directories is basically included in run_SIM.callSIM_script:

import dask
from distributed import LocalCluster
from dask.distributed import Client


# ------------------------------------------------------------------------------
import run_SIM
import pandas as pd

# ------------------------------------------------------------------------------
lazy_results = []

# ==============================================================================
def evaluate(x):
    parameterName = [  'divScheme_Val', 'nPhi_Val', 'nTheta_Val', 'nCell_Val']

    y = run_SIM.callSIM_script(x, parameterName )
    return y


# ==============================================================================
if __name__ == "__main__":
    # ..............................................................................
    cluster = LocalCluster(n_workers=4, threads_per_worker=10)
    client = Client(cluster, asynchronous=True)

    param = pd.read_csv("parameter_v2.csv")
    param = param.dropna(axis=1)
    print (param.head())
    # param = param.transpose()
    paramlist = param.values


    # ..............................................................................
    for parameters in paramlist:
        print (parameters)
        lazy_result = dask.delayed(evaluate)(parameters)
        lazy_results.append(lazy_result)

    # ..............................................................................
    # works dask.compute(*lazy_results)
    futures = dask.persist(*lazy_results)  
    client.cluster.scale(4)
    results = dask.compute(*futures)
    print(results)

I am running this code just from the command line.

It seems that it tries to create all simulation directory in the very beginning; this is not good, when I want to run a huge number of smaller cases or many larger cases (in the range of a few gigabyte each).

With my current tests, I have a smaller number with e.g. about 100 simulation; each simulation would take about 1h.

Is there a way to handle the step by step creation and starting? It would be great, when you have an advice for this!

Thank you in advance!
Best Fab

A small adjustment with an additional loop improves this, but it obviously is not very well suited for simulations, where the simulation time might vary for case to case.
Here I just added another for loop in combination with chunks:

# ------------------------------------------------------------------------------
def chunks(lst, n):
    """Yield successive n-sized chunks from lst."""
    for i in range(0, len(lst), n):
        yield lst[i:i + n]
# ==============================================================================
def evaluate(x):
    parameterName = [  'divScheme_Val', 'nPhi_Val', 'nTheta_Val', 'nCell_Val']

    y = run_SIM.callSIM_script(x, parameterName )
    return y


# ==============================================================================
if __name__ == "__main__":
    # ..............................................................................
    cluster = LocalCluster(n_workers=4, threads_per_worker=10)
    client = Client(cluster, asynchronous=True)

    param = pd.read_csv("parameter_v2.csv")
    param = param.dropna(axis=1)
    print (param.head())
    paramL = param.values

    paramChunkL = list(chunks(paramL, 2))
    print (paramChunkL[0])
    # ..............................................................................
    # for parameters in paramL:
    for paramChunk in paramChunkL:
        print (100*"=")
        for parameters in paramChunk:
            print (parameters)
            lazy_result = dask.delayed(evaluate)(parameters)
            lazy_results.append(lazy_result)

        # ..............................................................................
        # works dask.compute(*lazy_results)
        futures = dask.persist(*lazy_results)  # trigger computation in the background
        client.cluster.scale(4)
        results = dask.compute(*futures)
        print(results)

As mentioned, this is certainly not a good approach; it waits until all simulations are done from one chunk.

@fab6 Welcome!

Could you please share a minimal example, maybe with sleep() to simulate work, that can be reproduced locally? It’ll allow me to help you better!

It seems that it tries to create all simulation directory in the very beginning; this is not good, when I want to run a huge number of smaller cases or many larger cases (in the range of a few gigabyte each).

Here’s what I think might be happening:

  • when you call persist, it triggers computation of lazy_results and since you have n_workers=4, threads_per_worker=10, = 40 total threads, it starts 40 parallel tasks.
  • client.cluster.scale(4) doesn’t really change anything because the cluster already has 4 workers

So, if you start with cluster = LocalCluster(n_workers=4, threads_per_worker=1) – i.e., 1 thread per worker, I think you will be able to achieve the 4-file parallelism that you want. You also don’t need to use cluster.scale anymore.

(Please let me know if I’m misunderstanding something!)

A few more notes about your workflow:

  • Using an asynchronous client may lead to unexpected results, so I’d encourage you to use asynchronous=False, and
  • Calling compute right after persist isn’t really helpful, you can all `compute directly here.

Hi,
thank you very much! With your comments, I am getting double the number for the simulations than expected and there are for all cases complains about not available directories.

I will prepare a minimal example (with some simple creation of dummy simulation directories) and post it for easier discussion.

Thank you very much for the help!
Best Fab

Hi,

I have now a better example

import dask
from distributed import LocalCluster
from dask.distributed import Client
import numpy as np


# ------------------------------------------------------------------------------
import time
import os

# ------------------------------------------------------------------------------
lazy_results = []

# ------------------------------------------------------------------------------
def createRunningDir(newDir):
    if not(os.path.isdir(newDir)):
        try:
            os.makedirs(newDir)
        except OSError as e:
            if e.errno != 17:
                raise
            pass

# ------------------------------------------------------------------------------
rootDir = os.getcwd()

# ==============================================================================
def evaluate(x):

    nr = x[0]
    print (100*"-")
    newDir = rootDir + "/designs/run_" + str(nr)
    print (newDir)
    createRunningDir(newDir)
    os.chdir(rootDir + "/designs/run_" + str(nr))
    time.sleep(5)
    os.system("touch file_" + str(nr))
    os.chdir(rootDir)

    y = 1
    return y


# ==============================================================================
if __name__ == "__main__":
    # ..............................................................................
    cluster = LocalCluster(n_workers=4, threads_per_worker=1)
    client = Client(cluster, asynchronous=False)

    # ..............................................................................
    # Get parameters
    # param = pd.read_csv("parameter_v2.csv")
    # param = param.dropna(axis=1)
    # paramL = param.values
    paramL = np.array(
    [[0, 1, 2, 3, 4],
    [1, 2, 2, 3, 4],
    [2, 3, 2, 3, 4],
    [3, 4, 2, 3, 4],
    [4, 5, 2, 3, 4],
    [5, 2, 2, 3, 4],
    [6, 3, 2, 3, 4],
    [7, 4, 2, 3, 4],
    [8, 5, 2, 3, 4],
    [9, 5, 2, 3, 4],
    [10, 5, 2, 3, 4],
    [11, 5, 2, 3, 4],
    [12, 5, 2, 3, 4],
    [13, 5, 2, 3, 4],
    [14, 5, 2, 3, 4],
    [15, 5, 2, 3, 4],
    [16, 5, 2, 3, 4],
    [17, 5, 2, 3, 4],
    [18, 5, 2, 3, 4],
    [19, 5, 2, 3, 4],
    [20, 5, 2, 3, 4]
     ])

    # ..............................................................................
    for parameters in paramL:
        print (100*"=")
        print (parameters)
        lazy_result = dask.delayed(evaluate)(parameters)
        lazy_results.append(lazy_result)

    # ..............................................................................
    # works dask.compute(*lazy_results)
    # futures = dask.persist(*lazy_results)  # trigger computation in the background
    dask.compute(*lazy_results, scheduler="threads")

It seemed that I am not exactly have 4 parallel evaluations, but 8 `evaluations.

What is still strange is that the handling of the directories and changing into these to execute a command is not very robust. This resulted in the complex simulation example that it could not find certain directories. This is seen also with the touching of the files, which I expected to be placed in the corresponding directories. But it is not, some are located in my rootdir and some are in a wrong run-directory and some are correctly placed…
It would be great, if you have more suggestions, how to get this more robust!
Thank you in advance!
Best Fab

Hi, it seems that this is not the right approach; or do I miss something?

Thank you in advance!
Best Fab

dask.compute(*lazy_results, scheduler=“threads”)

Hi, I think the problem is that you start a LocalCluster, but you ask Dask to compute on a threaded scheduler. I’d guess you’re machine as 8 cores or 4 multithreaded cores. And so you’re changing directory Frome several threads in the same process.

Just try with client.compute() instead!

Hi,
thank you very much!
I changed now

    #dask.compute(*lazy_results, scheduler="threads")
    dask.compute(*lazy_results)

and this works as it looks right now, the client.compute() gave an error:

I will do some more checks with the working setup.

Again, thank very much you for your help!

Sorry, I was meaning client.compute(*lazy_results).

Anyway, dask.compute call without the scheduler kwarg should work the same and use the last client object created!

1 Like