How does batch runner setup dask worker

Hello everyone,

I am working on an HPC cluster and been using the batch runner of dask-jobqueue. I have to use it because ssh tunneling is disabled for supposedly security reason. Furthermore admins prefer large job with one or more node and will manually downgrade priority for user who launch a lot of small jobs on share mode.

I am currently using this script :

#!/bin/bash
#SBATCH --job-name=dask
#SBATCH --constraint=GENOA
#SBATCH --nodes=1
#SBATCH --exclusive
#SBATCH --time=1:00:00

### handle environement
module purge
module load cpe/23.12
module load cray-python
module load conda
module list
conda activate my_env

# Fail on first error
set -eu
set -x

# Check if logs exists and is a directory, create if needed
if [ -e "logs" ] && [ ! -d "logs" ]; then
    echo "Error: 'logs' exists but is not a directory" >&2
    exit 1
elif [ ! -d "logs" ]; then
    mkdir -p logs || {
        echo "Error: Failed to create logs directory" >&2
        exit 1
    }
fi

srun --exclusive --nodes=1 \
     --ntasks-per-node=192 \
     --cpus-per-task=1 \
     --threads-per-core=1 \
     python src/features/my_script.py --execution_mode batch_runner \
     2>&1 | tee "logs/run_${SLURM_JOB_ID}.log"

# Check exit status
if [ $? -ne 0 ]; then
    echo "Error: Python script failed"
    exit 1
fi

I have multiple question on how the workers are/can be defined:

  • Intuitively, it would make more sense to define workers with multiple core and more memory to avoid the unnecessary communication between a myriad of small workers with one core ( current implementation). can i just define the size of the worker using srun argument ?

  • i can have up to two threads-per-core but i have no idea if it will actually change anything as my code is executed on the single thread but it doesn’t seems that dask based multithreading is an option for a batchrunner . I can do multi-node but i would be fine to do multiple one node jobs too. it would make sense to use multithreading because all the workers memory could be located on the same node and it would be nice to be able to take full advatange of the two logcial thread per core. Is there a particular client/batch runner option to enable it?

  • Is my best option is to to actually launch multiple local cluster with srun if i can partition my jobs as smaller tasks ?

Hi @EtienneReboul, welcome to Dask community!

If I’m not mistaken, looking at the source code, I can see that BseRunner, and so SLURMRunner, takes into argument a worker_options kwarg. Through this kwarg, you should be able to pass any argument to the Worker class, including nthreads and memory_limit.

So I would say that you’ll need to tweak things independently, on SLURM srun side to determine the cpus/threads you want to book, and on Dask SLURMRunner side to specify Worker threads and/or memory. Something like:

with SLURMRunner(scheduler_file="/path/to/shared/filesystem/scheduler-{job_id}.json", 
                 worker_options={"nthreads":4, "memory_limit":"20GiB"}) as runner:

cc @jacobtomlinson

1 Like

Thank you so much for your reply @guillaumeeb,

I think it make a lot of sense to do it that way. My initial guess would be to assign :

nthread = cpu-per-task * threads-per-core 
memory_limit   = cpu-per-task* mem-per-cpu

I could either explicitly pass those variables to the script with the argparser or by exporting them as environment variable. Is that initial guess make sense / accurate ?

Secondly, I’m struggling to understand the Worker API, is there a specific option to disable the spill to disk or to ensure that only one file per worker is spilled on disk.? My script is already writing a lot of avro files and i don’t want unforeseen files as i am already stretching my files quota thin. Any pointers on how to pass that through the SlurmRunner ?

Thank you in advance

Yes, I think so!

Well, I’m not sure about what you want here. There are options, like memory_spill_fraction or max_spill, that you can define through Worker class or through Dask configuration. But ideally you never want to spill. You can also set the spilling directory through local_directory, typically a local disk if available or scratch space.

Couldn’t you avoid to write many files?