Difference when starting a SLURM cluster with Conda from SLURM job or from Terminal

Hi everyone.
First, thank you for creating Dask. As it seems very suitable to distribute work on multiple cluster nodes, I wanted to use it as part of my pipeline in a Conda environment to compute the embarrassingly parallel parts.

For this, I wrote a class to start a SLURMcluster:

import os
import sys
import dask
from dask.distributed import Client, as_completed
from dask_jobqueue import SLURMCluster

class DaskSLURMcontrol():

    def __init__(self, nodes: int, memory_per_process: int, processes_per_node: int, identifier: str, result_path: str):
        self.nodes = nodes
        self.memory_per_node = '{}GB'.format(memory_per_process * processes_per_node)
        self.processes_per_node = processes_per_node
        
        # set output identifier for Dask workers (each produces its own out file)
        self.worker_out = os.path.join(result_path,'worker_slurm_out',identifier)
        
        self.cluster_kwargs = self.read_slurm_env_vars()
        
        # start and return the cluster object
        self.start_cluster()
        
    def get_cluster(self) -> SLURMCluster:
        return self.cluster

    def read_slurm_env_vars(self) -> dict:
        slurm_cluster_kwargs = {
            'cores': self.processes_per_node, # threads per job (if cores < processes --nthreads=1)
            'processes': self.processes_per_node, # cut up job in this many processes: 
                # Default process ~= sqrt(cores) 
                # here we use as many processes as cores assigned, so each process running 1 thread as those ar bound by GIL
            'memory': self.memory_per_node, # memory per node, 0GB doesn't work despite SLURM would react to it, but dask fails
            # https://discourse.pangeo.io/t/when-using-dask-slurmcluster-can-i-avoid-passing-the-memory-argument/2362/7
            'walltime': '00:30:00',
            'job_extra_directives': ['--hint=nomultithread', 
                                     '--exclusive', 
                                     '--partition=xeon',
                                     f'--output={self.worker_out}_%j.out',
                                     ],
            'job_script_prologue' : [
                'source path_to_miniconda/etc/profile.d/conda.sh',
                'conda activate dask',   # Activate your conda environment
                'python3 -c "import sys; print(sys.executable)"'  # print the python executable
                'python -c "import dask; print(dask.__version__)"' # print dask version
            ]            
        }

        return slurm_cluster_kwargs
    
    def start_cluster(self) -> SLURMCluster:
        cluster = SLURMCluster(**self.cluster_kwargs)
        # scale the cluster to as many nodes as desired. #self.nodes jobs are submitted forming a worker pool
        cluster.scale(jobs=self.nodes)
        self.cluster = cluster
    
    def close_cluster(self) -> None:
        # Close the cluser (client is closed in with statement)
        self.cluster.close()

This is used to set up the cluster, which then is passed to a Dask process wrapper to work a list of inputs:

def daskprocess_wrapper(cluster: DaskSLURMcontrol, parallel_args: list[tuple], func: Callable) -> list: 
    """
    Apply a function to a list of arguments using Dask with processes. The arguments are passed as tuples
    and are unpacked within the function. Dask is executed asynchronusly, but with the order of the results guaranteed.

    Args:
        cluster (DaskSLURMcontrol): The SLURMcontrol instance with the cluster information.
        parallel_args (list[tuple]): A list of tuples, where each tuple contains the arguments for the function.
        func (Callable): The function to apply to the arguments.

    Returns:
        list: A list of results from the function applied to the arguments in the order they are provided.
    """      

    if isinstance(cluster,DaskSLURMcontrol):
        # get the cluster object from the cluster instance
        client = Client(cluster.get_cluster())
        '''this is working, not distributed though'''
        # client = Client(n_workers=8)
    else:
         client = Client(scheduler_file=cluster)

    # Check the number of workers
    print(client.scheduler_info())
    # this will not work: https://dask.discourse.group/t/how-to-retrieve-the-requested-number-of-cores/798/6
    print('Cores {}'.format(sum(w['cores'] for w in client.scheduler_info()['workers'].values())))
    print('Threads {}'.format(sum(w['nthreads'] for w in client.scheduler_info()['workers'].values())))
    
    # list of delayed objects to compute
    futures = [client.submit(func, arg) for arg in parallel_args]
    #futures = client.map(func, parallel_args)
    print('Futures created')
    # Collect results as they complete: https://docs.dask.org/en/stable/futures.html
    # result_list = [future.result() for future in as_completed(futures)]
    # client gather should be more efficient as done concurrently
    # but as we collect them as completed, this is not sure and the documentation is not clear
    result_list = client.gather(futures)
    print('Results gathered')
    client.close()
    
    return result_list

I started the Python script using this class via a SLURM job:

#!/bin/sh
#SBATCH --time=01:00:00
#SBATCH --partition=xeon
#SBATCH --hint=nomultithread
#SBATCH --nodes=1
#SBATCH --ntasks-per-node=1
#SBATCH --cpus-per-task=1

source path_to_miniconda3/etc/profile.d/conda.sh/miniconda3/etc/profile.d/conda.sh
conda activate dask

## call the python script
python3 ${exp_path}dask_distributed_assembly_parsing_cli.py ${n_targets} ${worker_nodes} ${processes_per_node} ${repetition} ${memory_per_process} ${result_path}

This is working. As expected, when using self.nodes=8, self.processes_per_node=8, and self.memory_per_process=2 it deploys the main script to one node and creates 8 worker nodes.

The output is fine and as expected:

Starting Cluster
#!/usr/bin/env bash

#SBATCH -J dask-worker
#SBATCH -n 1
#SBATCH --cpus-per-task=8
#SBATCH --mem=15G
#SBATCH -t 00:30:00
#SBATCH --hint=nomultithread
#SBATCH --exclusive
#SBATCH --partition=xeon
#SBATCH --output=/output/dask_cli/results/worker_slurm_out/10000_8_8_1_%j.out
source /path_to_miniconda/etc/profile.d/conda.sh
source activate dask
/path_to_miniconda/envs/dask/bin/python3 -m distributed.cli.dask_worker tcp://10.34.59.1:36517 --name dummy-name --nthreads 1 --memory-limit 1.86GiB --nworkers 8 --nanny --death-timeout 60

Running repetition 1...
{'type': 'Scheduler', 'id': 'Scheduler-b30f609a-6737-48df-810e-ac86990ea283', 'address': 'tcp://10.34.59.1:36517', 'services': {'dashboard': 8787}, 'started': 1723204730.021974, 'workers': {}}
Cores 0
Threads 0
Futures created
Results gathered
Done repetition 1
Closing Cluster

First question:
It seems there is no way to use the node where the main thread is running as a worker as well, or to ask for all resources at the beginning and then to let SLURMcluster only demand those resources?
It seems inefficient to first wait for the resources to start the main thread on the cluster, and then to wait again for the resources to start the worker nodes.

To avoid this, I thought I can just start the main script from the terminal on the login node inside the Conda environment. Surprisingly, this does not work. The workers start, but stop after 30 seconds.
When checking the logs:

OSError: Timed out trying to connect to tcp://131.152.189.67:34916 after 30 s

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "<frozen runpy>", line 198, in _run_module_as_main
  File "<frozen runpy>", line 88, in _run_code
  File "/path_to_miniconda/envs/dask/lib/python3.11/site-packages/distributed/cli/dask_worker.py"

Second question:
As the first example is working, I am not sure why the seconds is not. I was thinking it might be permission issues for the workers not able to connect to the scheduler. Could there be another reason?

Recipe for my Conda environment:

conda create --name dask python=3.11
conda activate dask
pip install "dask[complete]" #2024.7.1
pip install dask-jobqueue  #0.8.5 

I am new to Dask and I could just be missing something. Thank you very much for some insights and for the great work.

Hi @RetoKrummenacher,

we had a very similar problem and came up with a solution which creates workers using the resources you request in your original slurm script and creating a scheduler and workers “by hand”.

I’m not sure if this is behind our institute’s firewall or not:

Basically you have the following:

#!/bin/bash

#SBATCH --account computing.computing
#SBATCH -N 2
#SBATCH -p mpp

module purge
module load conda

conda activate sbatch_dask_cluster_script

NWORKERS=254 # One core for the scheduler and another one for the python script
MEM_PER_WORKER=2G # That adds up to a maximum of 256G per node, the RAM of each Albedo node

SCHEDULER_FILE="dask_scheduler.json"

# Start the scheduler
srun --ntasks=1 --nodes=1 --exclusive dask scheduler --scheduler-file $SCHEDULER_FILE --interface ib0 &

# Start the workers
srun --ntasks=$NWORKERS --nodes=2 --exclusive  dask worker --scheduler-file $SCHEDULER_FILE --memory-limit $MEM_PER_WORKER --interface ib0 &

sleep $((30*60)) # 30 minutes, adjust to the walltime

Importantly, you then in your Python part have something like this:

client = Client(scheduler_file='dask_scheduler.json')

One limitation that I would love to solve is that this doesn’t allow you to dynamically scale your Dask cluster object. I would love to be able to start a slurm job, create some workers with the resources I initially requested, and then on top, allow the program to dynamically create and add new workers from the main Python process. If anyone in the community knows how to do that, I would be happy to learn.

Happy hacking,
Paul

Hi @RetoKrummenacher, welcome to Dask community!

You are right. Main thread is used for Scheduler and your main Python Client script though, which needs a bit of resources.

If you wan to start all the resources at the same time, you should try to use Dask MPI.

The other solution is as @pgierz proposes, it’s not that hard to build a custom script for this.

It’s not rare to see HPC system with connexion between login nodes and compute nodes blocked. Or the interfaces might not be the same. In any case, this is probably a network issue, you might be able to find the correct configuration of dask-jobqueue to get around, but it’s not always the case.

Thank you @pgierz and @guillaumeeb

This clarifies some things and I will try the solution proposed by @pgierz.

1 Like