Unique job names would be a start, but ultimately the job name should be human readable and used for grouping jobs for e.g. post-processing. To make this more concrete, please see the example at the end.
is not really in the spirit of Dask. With Dask, you want each workers to launch plenty of tasks. And usually in an HPC system, you want long jobs that last at least a few minutes, and better a few hours. How many time is your func
taking?
func
can take anything from 15 minutes to 15 hours. It is an ILP mathematical optimization problem. Thanks for mentioning submitit, will have a look.
To give more background on my reasoning, see the following example for what I am doing right now with SLURM (using simple-slurm, which is just a thin wrapper around SLURM). There is only one task per job.
from simple_slurm import Slurm
slurm = Slurm(
nodes=1,
ntasks=1,
cpus_per_task=16,
job_name=f"{scenario_name}-{scenario_file.stem}",
output=f"slurm-{Slurm.JOB_NAME}-{scenario_file.stem}-{Slurm.JOB_ID}.out",
mail_type="BEGIN,END,FAIL",
mail_user="slurmuser@compa.ny",
)
sbatch_str = f"SOME_ENV_VAR=/some/path ANOTHER_ENV_VAR=solver SCENARIOS={scenario_name} PATH=$PATH:~/opt/additional/binaries PYTHONPATH=. python src/the_job.py {args.model_input} {args.model_data} {local_scenario_file}"
slurm.sbatch(sbatch_str)
This works and does what it should in terms of submitting to SLURM. The problem with this is that the handling of results and monitoring of running jobs must be done via the SLURM squeue, etc. CLI tools. My idea was to use dask-jobqueue to have a web dashboard of jobs with associated logs and use the Dask futures API to handle job completion, e.g. copying result paths to network drives. Here is what I came up with:
from dask.distributed import as_completed
from dask.distributed import Client
from dask_jobqueue import SLURMCluster
extra_directives = [
"--mail-type BEGIN,END,FAIL",
"--mail-user slurmuser@compa.ny",
f"--job-name {local_scenario_file.stem}",
]
cluster = SLURMCluster(
walltime="30:00:00",
queue="32c",
account="slurmuser",
cores=16,
memory="30 GB",
job_extra_directives=extra_directives,
)
cluster.scale(jobs=len(scenarios))
print(cluster.dashboard_link)
client = Client(cluster)
futures = []
for scenario in scenarios:
print(scenario.name)
future = client.submit(func, scenario, args, local_scenario_file)
futures.append(future)
for future in as_completed(futures):
result = future.result()
handle(result)
print("all jobs done")