Dask-mpi and mpirun - Can't scale up run with larger files and or more core/node on HPC cluster

I am trying to implement a distributed run over several nodes and cores on an HPC cluster using the slurm submission system. I successfully tested the implementation of my code in an interactive session and it runs fine. However, the implementation using the batch system is difficult.
I can run the implementation on one node, but when I try to use a second node the process does not go any faster, which seems to suggest to me that keeps trying to run everything on the first node. I tried implementing this: this solution, but the problem persists.

Here is my slurm submission script:

#!/bin/bash -l

#SBATCH --nodes=2
#SBATCH --ntasks=96
#SBATCH --cpus-per-task=1
#SBATCH -t 00:15:00
#SBATCH --mem-per-cpu=4GB
#SBATCH --exclusive

#load modules for the job#
module load GCCcore/13.3.0 &&
module load Python/3.12.3 &&
module load GCC/12.3.0 &&
module load OpenMPI/4.1.5 &&
module load dask/2023.12.1 &&
module load mpi4py/3.1.4

export OMPI_MCA_btl='^uct,ofi'
export OMPI_MCA_pml='ucx'
export OMPI_MCA_mtl='^ofi'
export MALLOC_TRIM_THRESHOLD_=0
export MALLOC_ARENA_MAX=2

ulimit -n 8000

mpirun -np 96 --map-by ppr:48:node --mca btl_tcp_if_include eth0 python sbatch_process_fluxes.py  $TMP_STORE

and here my set up for dask in sbatch_process_fluxes.py:

import sys
import dask_mpi
from dask.distributed import Client
def main():
    local_dir    = sys.argv[1]

    dask_mpi.core.initialize(local_directory=local_dir)

    from mpi4py import MPI

    comm = MPI.COMM_WORLD
    rank = comm.Get_rank()

    if rank != 1:
        return

    with Client() as client:
           # process grid cells here

I don’t get any error message, slurm just hits the upper time limit and terminates when I try running this functioning implementation for files that are in total 60 GB large. Each node has 265 GB storage and local_dir is the job specific storage defined by slurm ($TEMP_STORE).

The gridcell processing is producing one file / gridcell form which I can see that the processing starts and I get the first gridcell file but then processing gets stuck or gets very slow, the next output is produced after 8 minutes.

Any idea what is going wrong?

Hi @Antje, welcome to Dask community!

Could you precise here what you mean? Using a LocalCluster? Which parameters?

Here again, what do you change between the two runs?

Did you try to take a look at the Dashboard? Or use performance Diagnostics?

You submission script and Python script look good to me, but what are you doing after this, how do you use Dask?

There are many other parts in your overall setup, from infrastructure to user code, that might explain this behaviour.

You example is a bit hard to read. I’m not sure of what your real input data is. Not sure why you are storing dask dataframe into a dict all_data, and it seems you are passing this dict to all delayed?

I’m really not sure what you are trying to achieve, and I would need some simplification of your code to understand. Perhaps what you will do if it wasn’t distributed or big data?