dask_jobqueue.SLURMCluster: multi-threaded workloads and the effect of setting "cores"

Hi

Thanks for this project!

I have some questions, which may be related to Resource allocation on SLURM cluster · Issue #616 · dask/dask-jobqueue · GitHub even though I use cluster.scale(jobs=...) (batch jobs) instead of cluster.scale(n=...) (workers). I basically use dask_jobqueue to launch batch jobs on a SLURM machine, using client.map(workload_function, iterable).

Consider this code:

import time
import platform
import os

import pandas as pd
from dask.distributed import Client, get_worker
from dask_jobqueue import SLURMCluster

def workload(dummy):
    start_time = pd.Timestamp(time.time(), unit="s")
    time.sleep(5)
    return dict(
        start_time=start_time,
        host=platform.node(),
        worker_name=get_worker().name,
    )


cluster = SLURMCluster(
    queue="some_queue",
    processes=1,
    cores=8,
    memory="20GiB",
    walltime="00:20:00",
    local_directory="dask_tmp",
    log_directory="dask_log",
    scheduler_options={"dashboard_address": ":3333"},
    job_extra_directives=["--gres gpu:1"],
)


cluster.scale(jobs=1)
print(cluster.job_script())

client = Client(cluster)
futures = client.map(workload, range(16))
results = client.gather(futures)

df = pd.DataFrame(results)
df = df.sort_values(["start_time", "worker_name"])
print(df)

Created job script:

#!/usr/bin/env bash

#SBATCH -J dask-worker
#SBATCH -e dask_log/dask-worker-%J.err
#SBATCH -o dask_log/dask-worker-%J.out
#SBATCH -p some_queue
#SBATCH -n 1
#SBATCH --cpus-per-task=8
#SBATCH --mem=20G
#SBATCH -t 00:20:00
#SBATCH --gres gpu:1

/path/to/bin/python -m distributed.cli.dask_worker tcp://10.0.22.60:46401 \
    --nthreads 8 --memory-limit 20.00GiB --name dummy-name --nanny \
    --death-timeout 60 --local-directory dask_tmp

From my understanding of the docs, a “dask worker” is one Python process python -m distributed.cli.dask_worker, the number of which is controlled by SLURMCluster(process=1). So I have one batch job with one worker. The worker uses 8 threads (--nthreads 8) since SLURMCluster(cores=8,...).

Expected behavior

Each workload() is processed sequentially, having access to 8 cores for multi-threading.

The workload in workload() is a dummy sleep, but in reality is a mix of multi-threaded CPU and some GPU computations. So I expect that I can use 8 threads per workload() call.

Observed behavior

The dask worker starts as many workload() calls in parallel as it has threads, so 8. Each workload can use only one core.

Here is the relevant part of the created DataFrame:

                   start_time             host     worker_name
2023-11-05 19:02:56.418732405  node123.cluster  SLURMCluster-0
2023-11-05 19:02:56.418911695  node123.cluster  SLURMCluster-0
2023-11-05 19:02:56.419003725  node123.cluster  SLURMCluster-0
2023-11-05 19:02:56.419085026  node123.cluster  SLURMCluster-0
2023-11-05 19:02:56.419140816  node123.cluster  SLURMCluster-0
2023-11-05 19:02:56.419197798  node123.cluster  SLURMCluster-0
2023-11-05 19:02:56.419270515  node123.cluster  SLURMCluster-0
2023-11-05 19:02:56.419333696  node123.cluster  SLURMCluster-0

2023-11-05 19:03:01.422942400  node123.cluster  SLURMCluster-0
2023-11-05 19:03:01.434777021  node123.cluster  SLURMCluster-0
2023-11-05 19:03:01.434866428  node123.cluster  SLURMCluster-0
2023-11-05 19:03:01.434960127  node123.cluster  SLURMCluster-0
2023-11-05 19:03:01.435037613  node123.cluster  SLURMCluster-0
2023-11-05 19:03:01.435102224  node123.cluster  SLURMCluster-0
2023-11-05 19:03:01.435192823  node123.cluster  SLURMCluster-0
2023-11-05 19:03:01.436094046  node123.cluster  SLURMCluster-0

Out of the 16 workloads from client.map(workload, range(16)), we have two groups of 8 workload() calls. In each group all workloads are started at the same time and run in parallel, presumably due to having 8 threads per dask worker. The two groups are time.sleep(5) seconds apart.

I failed to find that behavior explained in the docs, so any hints as to whether this is intended behavior and why would be appreciated. My assumption so far was that one dask worker process handles one workload() call at a time, no matter how many threads it has.

(Possible) solution?

Following this comment, I set cores=1 and use job_cpu=8 instead. This still requests #SBATCH --cpus-per-task=8 but uses distributed.cli.dask_worker --nthreads 1. Now workload()s are processed sequentially and I can have multi-threading. In production, of course I use many more cluster.scale(jobs=...) to parallelize.

So is this the (or one?) way to have multi-threading workloads? Maybe I am not using dask_jobqueue correctly or this kind of workload and workflow is out of dask_jobqueue’s scope?

Thanks!

Hi @elcorto, welcome to Dask community!

What you describe is not specific to dask-jobqueue, it it the general mechanism of Dask. Dask generates tasks graph under the hood, where every task will be executed on any available thread among all the Dask worker processes. You may be interested in Scheduling — Dask documentation page.

In your case, every Workload() is a task in the graph.

This is one way to achieve what you want with dask-jobqueue, I would say this is the best and simplest way if all your workloads use the same number of cores.

Another one is to use resources, but this complexifies a bit deployment and user code.

Thanks a lot, that clears things up!