Hi
Thanks for this project!
I have some questions, which may be related to Resource allocation on SLURM cluster · Issue #616 · dask/dask-jobqueue · GitHub even though I use cluster.scale(jobs=...)
(batch jobs) instead of cluster.scale(n=...)
(workers). I basically use dask_jobqueue
to launch batch jobs on a SLURM machine, using client.map(workload_function, iterable)
.
Consider this code:
import time
import platform
import os
import pandas as pd
from dask.distributed import Client, get_worker
from dask_jobqueue import SLURMCluster
def workload(dummy):
start_time = pd.Timestamp(time.time(), unit="s")
time.sleep(5)
return dict(
start_time=start_time,
host=platform.node(),
worker_name=get_worker().name,
)
cluster = SLURMCluster(
queue="some_queue",
processes=1,
cores=8,
memory="20GiB",
walltime="00:20:00",
local_directory="dask_tmp",
log_directory="dask_log",
scheduler_options={"dashboard_address": ":3333"},
job_extra_directives=["--gres gpu:1"],
)
cluster.scale(jobs=1)
print(cluster.job_script())
client = Client(cluster)
futures = client.map(workload, range(16))
results = client.gather(futures)
df = pd.DataFrame(results)
df = df.sort_values(["start_time", "worker_name"])
print(df)
Created job script:
#!/usr/bin/env bash
#SBATCH -J dask-worker
#SBATCH -e dask_log/dask-worker-%J.err
#SBATCH -o dask_log/dask-worker-%J.out
#SBATCH -p some_queue
#SBATCH -n 1
#SBATCH --cpus-per-task=8
#SBATCH --mem=20G
#SBATCH -t 00:20:00
#SBATCH --gres gpu:1
/path/to/bin/python -m distributed.cli.dask_worker tcp://10.0.22.60:46401 \
--nthreads 8 --memory-limit 20.00GiB --name dummy-name --nanny \
--death-timeout 60 --local-directory dask_tmp
From my understanding of the docs, a “dask worker” is one Python process python -m distributed.cli.dask_worker
, the number of which is controlled by SLURMCluster(process=1)
. So I have one batch job with one worker. The worker uses 8 threads (--nthreads 8
) since SLURMCluster(cores=8,...)
.
Expected behavior
Each workload()
is processed sequentially, having access to 8 cores for multi-threading.
The workload in workload()
is a dummy sleep, but in reality is a mix of multi-threaded CPU and some GPU computations. So I expect that I can use 8 threads per workload()
call.
Observed behavior
The dask worker starts as many workload()
calls in parallel as it has threads, so 8. Each workload can use only one core.
Here is the relevant part of the created DataFrame
:
start_time host worker_name
2023-11-05 19:02:56.418732405 node123.cluster SLURMCluster-0
2023-11-05 19:02:56.418911695 node123.cluster SLURMCluster-0
2023-11-05 19:02:56.419003725 node123.cluster SLURMCluster-0
2023-11-05 19:02:56.419085026 node123.cluster SLURMCluster-0
2023-11-05 19:02:56.419140816 node123.cluster SLURMCluster-0
2023-11-05 19:02:56.419197798 node123.cluster SLURMCluster-0
2023-11-05 19:02:56.419270515 node123.cluster SLURMCluster-0
2023-11-05 19:02:56.419333696 node123.cluster SLURMCluster-0
2023-11-05 19:03:01.422942400 node123.cluster SLURMCluster-0
2023-11-05 19:03:01.434777021 node123.cluster SLURMCluster-0
2023-11-05 19:03:01.434866428 node123.cluster SLURMCluster-0
2023-11-05 19:03:01.434960127 node123.cluster SLURMCluster-0
2023-11-05 19:03:01.435037613 node123.cluster SLURMCluster-0
2023-11-05 19:03:01.435102224 node123.cluster SLURMCluster-0
2023-11-05 19:03:01.435192823 node123.cluster SLURMCluster-0
2023-11-05 19:03:01.436094046 node123.cluster SLURMCluster-0
Out of the 16 workloads from client.map(workload, range(16))
, we have two groups of 8 workload()
calls. In each group all workloads are started at the same time and run in parallel, presumably due to having 8 threads per dask worker. The two groups are time.sleep(5)
seconds apart.
I failed to find that behavior explained in the docs, so any hints as to whether this is intended behavior and why would be appreciated. My assumption so far was that one dask worker process handles one workload()
call at a time, no matter how many threads it has.
(Possible) solution?
Following this comment, I set cores=1
and use job_cpu=8
instead. This still requests #SBATCH --cpus-per-task=8
but uses distributed.cli.dask_worker --nthreads 1
. Now workload()
s are processed sequentially and I can have multi-threading. In production, of course I use many more cluster.scale(jobs=...)
to parallelize.
So is this the (or one?) way to have multi-threading workloads? Maybe I am not using dask_jobqueue
correctly or this kind of workload and workflow is out of dask_jobqueue
’s scope?
Thanks!