Dask-mpi and mpirun - Can't scale up run with larger files and or more core/node on HPC cluster

I am trying to implement a distributed run over several nodes and cores on an HPC cluster using the slurm submission system. I successfully tested the implementation of my code in an interactive session and it runs fine. However, the implementation using the batch system is difficult.
I can run the implementation on one node, but when I try to use a second node the process does not go any faster, which seems to suggest to me that keeps trying to run everything on the first node. I tried implementing this: this solution, but the problem persists.

Here is my slurm submission script:

#!/bin/bash -l

#SBATCH --nodes=2
#SBATCH --ntasks=96
#SBATCH --cpus-per-task=1
#SBATCH -t 00:15:00
#SBATCH --mem-per-cpu=4GB
#SBATCH --exclusive

#load modules for the job#
module load GCCcore/13.3.0 &&
module load Python/3.12.3 &&
module load GCC/12.3.0 &&
module load OpenMPI/4.1.5 &&
module load dask/2023.12.1 &&
module load mpi4py/3.1.4

export OMPI_MCA_btl='^uct,ofi'
export OMPI_MCA_pml='ucx'
export OMPI_MCA_mtl='^ofi'
export MALLOC_TRIM_THRESHOLD_=0
export MALLOC_ARENA_MAX=2

ulimit -n 8000

mpirun -np 96 --map-by ppr:48:node --mca btl_tcp_if_include eth0 python sbatch_process_fluxes.py  $TMP_STORE

and here my set up for dask in sbatch_process_fluxes.py:

import sys
import dask_mpi
from dask.distributed import Client
def main():
    local_dir    = sys.argv[1]

    dask_mpi.core.initialize(local_directory=local_dir)

    from mpi4py import MPI

    comm = MPI.COMM_WORLD
    rank = comm.Get_rank()

    if rank != 1:
        return

    with Client() as client:
           # process grid cells here

I don’t get any error message, slurm just hits the upper time limit and terminates when I try running this functioning implementation for files that are in total 60 GB large. Each node has 265 GB storage and local_dir is the job specific storage defined by slurm ($TEMP_STORE).

The gridcell processing is producing one file / gridcell form which I can see that the processing starts and I get the first gridcell file but then processing gets stuck or gets very slow, the next output is produced after 8 minutes.

Any idea what is going wrong?

Hi @Antje, welcome to Dask community!

Could you precise here what you mean? Using a LocalCluster? Which parameters?

Here again, what do you change between the two runs?

Did you try to take a look at the Dashboard? Or use performance Diagnostics?

You submission script and Python script look good to me, but what are you doing after this, how do you use Dask?

There are many other parts in your overall setup, from infrastructure to user code, that might explain this behaviour.

Hey @guillaumeeb, thanks for taking the time.

SInce my original post I have been able to make it run on 4 nodes but performance is an issue. The larger my input files become the more challenging it is for the job to finish.
I dod not look at the dashboard but I logged into the individual nodes of the job after the job had been running for a couple of hours and memory usage was really high and CPU usage had dropped to around 20% while the slurm script gave me warnings like

- distributed.utils_perf - INFO - full garbage collection released 135.33 MiB from 0 reference cycles (threshold: 9.54 MiB)

WARNING - full garbage collections took 12% CPU time recently (threshold: 10%)
distributed.core - INFO - Event loop was unresponsive in Scheduler for 7.34s.  This is often caused by long-running GIL-holding functions or moving large chunks of data. This can cause timeouts and instability.

My data in total is around 30GB and I read it in chunks to hand it to processing. I originally wanted each of the tasks to write a csv file including every year, but I am currently trying if this causing the memory build up, because files only really get written at the end of the job when I tested with 400 tasks out of around 40.000.

Here is a minimum example:

import numpy as np
import pandas as pd
import dask
import dask.dataframe as dd
from dask import delayed
from dask.distributed import Client
from dask_mpi import initialize

def main():
    dask_mpi.core.initialize(memory_limit='auto')

    # Initialize Dask client for parallel processing
    with Client() as client:
        dask.config.set({'logging.distributed': 'error',
                         'timeouts': {'connect': '60s', 'tcp': '60s'},
                         'distributed.scheduler.worker-ttl': None,
                        })

        #--- Get paths
        path = '/path/to/storage'

        def process_coords(coords):
            lon, lat = coords
            print(f"Processing: {coords}")

            # Read data in chunks using daskinto dictionary
            data_at_coords= {
                position: {
                    key: data_at_coords[position][key]
                    .loc[(data_at_coords[position][key].Lon == lon) &
                        (data_at_coords[position][key].Lat == lat)]
                    .reset_index(drop=True).iloc[:,3:].compute().values  # Convert the Dask DataFrame to Pandas DataFrame to allow for row-wise computing
                    for key in all_data[position]
                }
                for position in all_data
            }

            # Filter climate data for coords
            climate_ss = climate.loc[(climate.Lon == lon) &
                                   (climate.Lat == lat),
                                 [variables]].copy().reset_index(drop=True).compute().values

            # Process the grid cell
            df_grid = wd.calc_flux(climate_ss, data_at_coords,path)

            return df_grid

       # read files with dask
        base_path = path
        positions = [position1,  position2]
        keys = [ten_keys]
        file_keys = [ten_file_keys]

        # Initialize mass dictionary
        all_data = {position: {} for position in positions}

        # Load the mass data
        for position in positions:
             for k,key in enumerate(keys):
                 file_key = file_keys[k]
                 file_path = os.path.join(base_path, f"file_{file_key}_{position.capitalize()}.txt")  
                 # Use Dask to read files
                 all_data[position][key] = dd.read_csv(file_path, delim_whitespace=True)

        climate = dd.read_csv(base_path+'climate.txt)
        climate['coords'] = climate.apply(wd.make_tuple,axis=1)
        unique_coords = climate.coords.unique()

        # Create an empty list to hold delayed tasks
        delayed_results = []

        # Parallelize the loop over unique_coords
        for coords in unique_coords:
            delayed_task = delayed(process_coords)(coords)
            delayed_results.append(delayed_task)

        # Compute all the tasks in parallel
        df_flux = dd.from_delayed(delayed_results).compute()

        # Do some things in the end

I know my code is not that efficient. I get this model output in 11 different files, because they are input in the coordinate specific computation in process_ccords where I calculate fluxes for a number of years. df_flux has 200 rows for each coordinate (40.000).

So that’s it. Any help is really appreciated.

You example is a bit hard to read. I’m not sure of what your real input data is. Not sure why you are storing dask dataframe into a dict all_data, and it seems you are passing this dict to all delayed?

I’m really not sure what you are trying to achieve, and I would need some simplification of your code to understand. Perhaps what you will do if it wasn’t distributed or big data?