Hi everyone.
First, thank you for creating Dask. As it seems very suitable to distribute work on multiple cluster nodes, I wanted to use it as part of my pipeline in a Conda environment to compute the embarrassingly parallel parts.
For this, I wrote a class to start a SLURMcluster:
import os
import sys
import dask
from dask.distributed import Client, as_completed
from dask_jobqueue import SLURMCluster
class DaskSLURMcontrol():
def __init__(self, nodes: int, memory_per_process: int, processes_per_node: int, identifier: str, result_path: str):
self.nodes = nodes
self.memory_per_node = '{}GB'.format(memory_per_process * processes_per_node)
self.processes_per_node = processes_per_node
# set output identifier for Dask workers (each produces its own out file)
self.worker_out = os.path.join(result_path,'worker_slurm_out',identifier)
self.cluster_kwargs = self.read_slurm_env_vars()
# start and return the cluster object
self.start_cluster()
def get_cluster(self) -> SLURMCluster:
return self.cluster
def read_slurm_env_vars(self) -> dict:
slurm_cluster_kwargs = {
'cores': self.processes_per_node, # threads per job (if cores < processes --nthreads=1)
'processes': self.processes_per_node, # cut up job in this many processes:
# Default process ~= sqrt(cores)
# here we use as many processes as cores assigned, so each process running 1 thread as those ar bound by GIL
'memory': self.memory_per_node, # memory per node, 0GB doesn't work despite SLURM would react to it, but dask fails
# https://discourse.pangeo.io/t/when-using-dask-slurmcluster-can-i-avoid-passing-the-memory-argument/2362/7
'walltime': '00:30:00',
'job_extra_directives': ['--hint=nomultithread',
'--exclusive',
'--partition=xeon',
f'--output={self.worker_out}_%j.out',
],
'job_script_prologue' : [
'source path_to_miniconda/etc/profile.d/conda.sh',
'conda activate dask', # Activate your conda environment
'python3 -c "import sys; print(sys.executable)"' # print the python executable
'python -c "import dask; print(dask.__version__)"' # print dask version
]
}
return slurm_cluster_kwargs
def start_cluster(self) -> SLURMCluster:
cluster = SLURMCluster(**self.cluster_kwargs)
# scale the cluster to as many nodes as desired. #self.nodes jobs are submitted forming a worker pool
cluster.scale(jobs=self.nodes)
self.cluster = cluster
def close_cluster(self) -> None:
# Close the cluser (client is closed in with statement)
self.cluster.close()
This is used to set up the cluster, which then is passed to a Dask process wrapper to work a list of inputs:
def daskprocess_wrapper(cluster: DaskSLURMcontrol, parallel_args: list[tuple], func: Callable) -> list:
"""
Apply a function to a list of arguments using Dask with processes. The arguments are passed as tuples
and are unpacked within the function. Dask is executed asynchronusly, but with the order of the results guaranteed.
Args:
cluster (DaskSLURMcontrol): The SLURMcontrol instance with the cluster information.
parallel_args (list[tuple]): A list of tuples, where each tuple contains the arguments for the function.
func (Callable): The function to apply to the arguments.
Returns:
list: A list of results from the function applied to the arguments in the order they are provided.
"""
if isinstance(cluster,DaskSLURMcontrol):
# get the cluster object from the cluster instance
client = Client(cluster.get_cluster())
'''this is working, not distributed though'''
# client = Client(n_workers=8)
else:
client = Client(scheduler_file=cluster)
# Check the number of workers
print(client.scheduler_info())
# this will not work: https://dask.discourse.group/t/how-to-retrieve-the-requested-number-of-cores/798/6
print('Cores {}'.format(sum(w['cores'] for w in client.scheduler_info()['workers'].values())))
print('Threads {}'.format(sum(w['nthreads'] for w in client.scheduler_info()['workers'].values())))
# list of delayed objects to compute
futures = [client.submit(func, arg) for arg in parallel_args]
#futures = client.map(func, parallel_args)
print('Futures created')
# Collect results as they complete: https://docs.dask.org/en/stable/futures.html
# result_list = [future.result() for future in as_completed(futures)]
# client gather should be more efficient as done concurrently
# but as we collect them as completed, this is not sure and the documentation is not clear
result_list = client.gather(futures)
print('Results gathered')
client.close()
return result_list
I started the Python script using this class via a SLURM job:
#!/bin/sh
#SBATCH --time=01:00:00
#SBATCH --partition=xeon
#SBATCH --hint=nomultithread
#SBATCH --nodes=1
#SBATCH --ntasks-per-node=1
#SBATCH --cpus-per-task=1
source path_to_miniconda3/etc/profile.d/conda.sh/miniconda3/etc/profile.d/conda.sh
conda activate dask
## call the python script
python3 ${exp_path}dask_distributed_assembly_parsing_cli.py ${n_targets} ${worker_nodes} ${processes_per_node} ${repetition} ${memory_per_process} ${result_path}
This is working. As expected, when using self.nodes=8, self.processes_per_node=8, and self.memory_per_process=2 it deploys the main script to one node and creates 8 worker nodes.
The output is fine and as expected:
Starting Cluster
#!/usr/bin/env bash
#SBATCH -J dask-worker
#SBATCH -n 1
#SBATCH --cpus-per-task=8
#SBATCH --mem=15G
#SBATCH -t 00:30:00
#SBATCH --hint=nomultithread
#SBATCH --exclusive
#SBATCH --partition=xeon
#SBATCH --output=/output/dask_cli/results/worker_slurm_out/10000_8_8_1_%j.out
source /path_to_miniconda/etc/profile.d/conda.sh
source activate dask
/path_to_miniconda/envs/dask/bin/python3 -m distributed.cli.dask_worker tcp://10.34.59.1:36517 --name dummy-name --nthreads 1 --memory-limit 1.86GiB --nworkers 8 --nanny --death-timeout 60
Running repetition 1...
{'type': 'Scheduler', 'id': 'Scheduler-b30f609a-6737-48df-810e-ac86990ea283', 'address': 'tcp://10.34.59.1:36517', 'services': {'dashboard': 8787}, 'started': 1723204730.021974, 'workers': {}}
Cores 0
Threads 0
Futures created
Results gathered
Done repetition 1
Closing Cluster
First question:
It seems there is no way to use the node where the main thread is running as a worker as well, or to ask for all resources at the beginning and then to let SLURMcluster only demand those resources?
It seems inefficient to first wait for the resources to start the main thread on the cluster, and then to wait again for the resources to start the worker nodes.
To avoid this, I thought I can just start the main script from the terminal on the login node inside the Conda environment. Surprisingly, this does not work. The workers start, but stop after 30 seconds.
When checking the logs:
OSError: Timed out trying to connect to tcp://131.152.189.67:34916 after 30 s
The above exception was the direct cause of the following exception:
Traceback (most recent call last):
File "<frozen runpy>", line 198, in _run_module_as_main
File "<frozen runpy>", line 88, in _run_code
File "/path_to_miniconda/envs/dask/lib/python3.11/site-packages/distributed/cli/dask_worker.py"
Second question:
As the first example is working, I am not sure why the seconds is not. I was thinking it might be permission issues for the workers not able to connect to the scheduler. Could there be another reason?
Recipe for my Conda environment:
conda create --name dask python=3.11
conda activate dask
pip install "dask[complete]" #2024.7.1
pip install dask-jobqueue #0.8.5
I am new to Dask and I could just be missing something. Thank you very much for some insights and for the great work.