Hi,
I am very new to dask and used tools like scoop
for simple sampling or for optimization with deap
before.
With the setup I could run easily multiple CFD simulations in parallel (on one node with predefined number of cores and also distributed over multiple cores with predefined number of cores on each one of them; this was without anything like slurm
).
These simulations were created step by step, which means, that when I defined to run 4 cases in parallel, 4 cases were created in 4 different files. And when the simulation was done, the next 4 were copied and started afterwards.
Now I would like to switch from e.g. scoop
to dask and have at the moment the current setup. The function with does all the work like creating new cases with own directories is basically included in run_SIM.callSIM_script
:
import dask
from distributed import LocalCluster
from dask.distributed import Client
# ------------------------------------------------------------------------------
import run_SIM
import pandas as pd
# ------------------------------------------------------------------------------
lazy_results = []
# ==============================================================================
def evaluate(x):
parameterName = [ 'divScheme_Val', 'nPhi_Val', 'nTheta_Val', 'nCell_Val']
y = run_SIM.callSIM_script(x, parameterName )
return y
# ==============================================================================
if __name__ == "__main__":
# ..............................................................................
cluster = LocalCluster(n_workers=4, threads_per_worker=10)
client = Client(cluster, asynchronous=True)
param = pd.read_csv("parameter_v2.csv")
param = param.dropna(axis=1)
print (param.head())
# param = param.transpose()
paramlist = param.values
# ..............................................................................
for parameters in paramlist:
print (parameters)
lazy_result = dask.delayed(evaluate)(parameters)
lazy_results.append(lazy_result)
# ..............................................................................
# works dask.compute(*lazy_results)
futures = dask.persist(*lazy_results)
client.cluster.scale(4)
results = dask.compute(*futures)
print(results)
I am running this code just from the command line.
It seems that it tries to create all simulation directory in the very beginning; this is not good, when I want to run a huge number of smaller cases or many larger cases (in the range of a few gigabyte each).
With my current tests, I have a smaller number with e.g. about 100 simulation; each simulation would take about 1h.
Is there a way to handle the step by step creation and starting? It would be great, when you have an advice for this!
Thank you in advance!
Best Fab