Changing cluster job name from default dask-worker

Using dask-jobqueue with:

from dask.distributed import Client
from dask_jobqueue import SLURMCluster

cluster = SLURMCluster(...)
cluster.scale(jobs=3)
client = Client(cluster)

def func():
    # doing actual work

future = client.submit(func)
handle(future)

Where and how should I set the SLURM job name such that I can override the default “dask-worker” for each submitted job? I had a look at dask_jobqueue.SLURMCluster — Dask-jobqueue 0.8.1+1.gbaabb4a.dirty documentation and found the job_name attribute of the cluster. Should I change this attribute prior to each job submission to the client to match that specific job? That feels wrong to me. Am I missing something?

EDIT: It is possible to set the job name via job_extra_directives and e.g. “–job-name some_prefix” but then that JobName is used for all jobs that are submitted. I am looking for a way to set the job name for each job that is submitted via client.submit().

Hi @fleimgruber, thanks for the question.

This is the attribute you should use to change the default dask-worker name. But yes, this name will be used for all the jobs generated by dask-jobqueue. There is no simple way of having a unique job name for every jobs submitted through dask-jobqueue. Maybe one can try to use some environment variable to reach this, but I’m not even sure of this…

I think there is a misunderstanding here. client.submit() launches Dask tasks, not job scheduler jobs. One jobs launches one or several Das workers, on each of this workers, several tasks will be launched by Dask. cluster.scale(jobs=3) is where the jobs are submitted.

Hi @guillaumeeb, thanks for your answers!

There is no simple way of having a unique job name for every jobs submitted through dask-jobqueue.

Can you estimate how much work it would be to implement this? I could have a look and try my hands at it. It would be a useful feature e.g. for visibility in squeue or when sending mails about job status (via --mail-user and --mail-type directives).

I think there is a misunderstanding here.

Ok, I think I got the job / worker / task concepts now, thank you for the explanation. With this knowledge, let me try to describe what I am trying to achieve: Run the logic of func inside a single task that is launched by a single worker that is launched by a single SLURM job. How should I use the dask and dask-jobqueue API for this?

Not in a few minutes, so I won’t look at it now, considering the following point. But there is certainly a way of having a kind of counter that is incremented on each job submission, this is already done in the SpecCluster parent class of JobQueueCluster for the dask Worker names. We might even be able to reuse this to have unique job names. And having job names accorded to Dask Worker names would be a nice feature.

But what you want to achieve:

is not really in the spirit of Dask. With Dask, you want each workers to launch plenty of tasks. And usually in an HPC system, you want long jobs that last at least a few minutes, and better a few hours. How many time is your func taking?

I would suggest you to take a look at submitit (since you’re on a Slurm cluster), which might be closer to what you need.

Unique job names would be a start, but ultimately the job name should be human readable and used for grouping jobs for e.g. post-processing. To make this more concrete, please see the example at the end.

is not really in the spirit of Dask. With Dask, you want each workers to launch plenty of tasks. And usually in an HPC system, you want long jobs that last at least a few minutes, and better a few hours. How many time is your func taking?

func can take anything from 15 minutes to 15 hours. It is an ILP mathematical optimization problem. Thanks for mentioning submitit, will have a look.

To give more background on my reasoning, see the following example for what I am doing right now with SLURM (using simple-slurm, which is just a thin wrapper around SLURM). There is only one task per job.

from simple_slurm import Slurm

slurm = Slurm(
    nodes=1,
    ntasks=1,
    cpus_per_task=16,
    job_name=f"{scenario_name}-{scenario_file.stem}",
    output=f"slurm-{Slurm.JOB_NAME}-{scenario_file.stem}-{Slurm.JOB_ID}.out",
    mail_type="BEGIN,END,FAIL",
    mail_user="slurmuser@compa.ny",
)
sbatch_str = f"SOME_ENV_VAR=/some/path ANOTHER_ENV_VAR=solver SCENARIOS={scenario_name} PATH=$PATH:~/opt/additional/binaries PYTHONPATH=. python src/the_job.py {args.model_input} {args.model_data} {local_scenario_file}"
slurm.sbatch(sbatch_str)

This works and does what it should in terms of submitting to SLURM. The problem with this is that the handling of results and monitoring of running jobs must be done via the SLURM squeue, etc. CLI tools. My idea was to use dask-jobqueue to have a web dashboard of jobs with associated logs and use the Dask futures API to handle job completion, e.g. copying result paths to network drives. Here is what I came up with:

from dask.distributed import as_completed
from dask.distributed import Client
from dask_jobqueue import SLURMCluster

extra_directives = [
    "--mail-type BEGIN,END,FAIL",
    "--mail-user slurmuser@compa.ny",
    f"--job-name {local_scenario_file.stem}",
]
cluster = SLURMCluster(
    walltime="30:00:00",
    queue="32c",
    account="slurmuser",
    cores=16,
    memory="30 GB",
    job_extra_directives=extra_directives,
)

cluster.scale(jobs=len(scenarios))
print(cluster.dashboard_link)

client = Client(cluster)

futures = []
for scenario in scenarios:
    print(scenario.name)
    future = client.submit(func, scenario, args, local_scenario_file)
    futures.append(future)

for future in as_completed(futures):
    result = future.result()
    handle(result)

print("all jobs done")

I would suggest you to take a look at submitit (since you’re on a Slurm cluster), which might be closer to what you need.

Thanks again for that. I found the as_completed I was looking for. The dask dashboard and logs would have been nice (also to give team members a peek into the running tasks), but it is still more than what I had before.

1 Like

I think that ultimately you could manage to get what you want with Dask, but with some tweaks and not really clean workarounds.

If you wanted to use Dask and its features, I would recommend to stop thinking of dask-jobqueue (and Dask) as one job/worker for one task. You should just start several workers/jobs, and run several tasks on each workers. And the post-processing/status logic would be handle in Python code, not using Slurm features. For example, you could give a name to each Python task, send a mail at the end of the task (so at the end of the Future execution).

I’m glad it suits your need, I feel it’s also closer to what you were looking for, but do not hesitate to come back to Dask for more complex workflows or a different way of doing things.

You might be interested by
https://examples.dask.org/applications/embarrassingly-parallel.html

for future needs close to what you’re doing.