Parallelisation by multiprocessing not multithreading on SLURMCluster

Hello everyone,

I am trying to set up a Dask cluster on a SLURMCluster. The HPC cluster I am on has 128 cores per node each of them being dual threaded (I just put cores=20 below to have a smaller example). I want to do the parallelisation by multiprocessing and not by multithreading to avoid Python GIL issues.

Setting up the cluster in the following way (multithreaded scenario) works per se fine, but then I run into issues regarding the GIL:
content of jobqueue.yaml file:

jobqueue:
  slurm:
    name: dask-worker

    # Dask worker options
    cores: 20                 # Total number of cores per job
    memory: "500 GB"                # Total amount of memory per job
    processes: 1                # Number of Python processes per job

    interface: ib0             # Network interface to use like eth0 or ib0
    death-timeout: 60           # Number of seconds to wait if a worker can not find a scheduler
    local-directory: "</some/tmp/dir>"       # Location of fast local storage like /scratch or $TMPDIR
    extra: []

    # SLURM resource manager options
    shebang: "#!/usr/bin/env bash"
    queue: <some/queue>
    project: <some/project>
    walltime: '00:30:00'
    env-extra: []
    job-cpu: null
    job-mem: null
    job-extra: []
    log-directory: null

    # Scheduler options
    scheduler-options: {}

I create the cluster and client as follows (for a minimal example here scaling to 2 jobs):

from dask_jobqueue import SLURMCluster
from dask.distributed import Client

cluster = SLURMCluster(nanny=False)
cluster.scale(jobs=2)
client = Client(cluster)

As expected leading to the following client/cluster: <Client: 'tcp://10.14.0.42:43047' processes=2 threads=40, memory=0.91 TiB>

So far so good, but unfortunately this does the parallelisation by multi-threading and therefore leads to GIL issues when doing Python computations.

If I try to set up a multiprocessing, single-threaded scenario like so:
content of jobqueue.yaml file:

jobqueue:
  slurm:
    name: dask-worker

    # Dask worker options
    cores: 20                 # Total number of cores per job
    memory: "500 GB"                # Total amount of memory per job
    processes: 20                # Number of Python processes per job

    interface: ib0             # Network interface to use like eth0 or ib0
    death-timeout: 60           # Number of seconds to wait if a worker can not find a scheduler
    local-directory: "</some/tmp/dir>"       # Location of fast local storage like /scratch or $TMPDIR
    extra: []

    # SLURM resource manager options
    shebang: "#!/usr/bin/env bash"
    queue: <some/queue>
    project: <some/project>
    walltime: '00:30:00'
    env-extra: []
    job-cpu: null
    job-mem: null
    job-extra: []
    log-directory: null

    # Scheduler options
    scheduler-options: {}

I again create the cluster and client as follows (for a minimal example again scaling to 2 jobs) but there I cannot not set nanny=False:

from dask_jobqueue import SLURMCluster
from dask.distributed import Client

cluster = SLURMCluster()
cluster.scale(jobs=2)
client = Client(cluster)

This leads to the following client/cluster: <Client: 'tcp://10.14.0.42:33685' processes=40 threads=40, memory=0.91 TiB>
Which looks like a single-threaded scenario.

The problem is know, when I start an example computation passing this client I run into the problem that daemonic processes are not allowed to have children (the reason why I had set nanny=False before).

How can I achieve the parallelisation via processes and not threads given the problems described above? Would it maybe be better to use MPI instead of dask_jobqueue?

I am very happy to follow any general changes in case I am missunderstanding some basics here.

Thank you very much!

1 Like

Hi @verakye,

So I guess the tasks you’re submitting to your dask cluster do start other processes under the hood? I’m a little bit confused on why it would complain if not, as your workers seem to be correctly started with the no nanny flag…

Anyway, you can also look at distributed.worker.daemon distributed configuration. See Configuration — Dask documentation.

Cheers!

1 Like