I am trying to implement a distributed run over several nodes and cores on an HPC cluster using the slurm submission system. I successfully tested the implementation of my code in an interactive session and it runs fine. However, the implementation using the batch system is difficult.
I can run the implementation on one node, but when I try to use a second node the process does not go any faster, which seems to suggest to me that keeps trying to run everything on the first node. I tried implementing this: this solution, but the problem persists.
and here my set up for dask in sbatch_process_fluxes.py:
import sys
import dask_mpi
from dask.distributed import Client
def main():
local_dir = sys.argv[1]
dask_mpi.core.initialize(local_directory=local_dir)
from mpi4py import MPI
comm = MPI.COMM_WORLD
rank = comm.Get_rank()
if rank != 1:
return
with Client() as client:
# process grid cells here
I don’t get any error message, slurm just hits the upper time limit and terminates when I try running this functioning implementation for files that are in total 60 GB large. Each node has 265 GB storage and local_dir is the job specific storage defined by slurm ($TEMP_STORE).
The gridcell processing is producing one file / gridcell form which I can see that the processing starts and I get the first gridcell file but then processing gets stuck or gets very slow, the next output is produced after 8 minutes.
SInce my original post I have been able to make it run on 4 nodes but performance is an issue. The larger my input files become the more challenging it is for the job to finish.
I dod not look at the dashboard but I logged into the individual nodes of the job after the job had been running for a couple of hours and memory usage was really high and CPU usage had dropped to around 20% while the slurm script gave me warnings like
- distributed.utils_perf - INFO - full garbage collection released 135.33 MiB from 0 reference cycles (threshold: 9.54 MiB)
WARNING - full garbage collections took 12% CPU time recently (threshold: 10%)
distributed.core - INFO - Event loop was unresponsive in Scheduler for 7.34s. This is often caused by long-running GIL-holding functions or moving large chunks of data. This can cause timeouts and instability.
My data in total is around 30GB and I read it in chunks to hand it to processing. I originally wanted each of the tasks to write a csv file including every year, but I am currently trying if this causing the memory build up, because files only really get written at the end of the job when I tested with 400 tasks out of around 40.000.
Here is a minimum example:
import numpy as np
import pandas as pd
import dask
import dask.dataframe as dd
from dask import delayed
from dask.distributed import Client
from dask_mpi import initialize
def main():
dask_mpi.core.initialize(memory_limit='auto')
# Initialize Dask client for parallel processing
with Client() as client:
dask.config.set({'logging.distributed': 'error',
'timeouts': {'connect': '60s', 'tcp': '60s'},
'distributed.scheduler.worker-ttl': None,
})
#--- Get paths
path = '/path/to/storage'
def process_coords(coords):
lon, lat = coords
print(f"Processing: {coords}")
# Read data in chunks using daskinto dictionary
data_at_coords= {
position: {
key: data_at_coords[position][key]
.loc[(data_at_coords[position][key].Lon == lon) &
(data_at_coords[position][key].Lat == lat)]
.reset_index(drop=True).iloc[:,3:].compute().values # Convert the Dask DataFrame to Pandas DataFrame to allow for row-wise computing
for key in all_data[position]
}
for position in all_data
}
# Filter climate data for coords
climate_ss = climate.loc[(climate.Lon == lon) &
(climate.Lat == lat),
[variables]].copy().reset_index(drop=True).compute().values
# Process the grid cell
df_grid = wd.calc_flux(climate_ss, data_at_coords,path)
return df_grid
# read files with dask
base_path = path
positions = [position1, position2]
keys = [ten_keys]
file_keys = [ten_file_keys]
# Initialize mass dictionary
all_data = {position: {} for position in positions}
# Load the mass data
for position in positions:
for k,key in enumerate(keys):
file_key = file_keys[k]
file_path = os.path.join(base_path, f"file_{file_key}_{position.capitalize()}.txt")
# Use Dask to read files
all_data[position][key] = dd.read_csv(file_path, delim_whitespace=True)
climate = dd.read_csv(base_path+'climate.txt)
climate['coords'] = climate.apply(wd.make_tuple,axis=1)
unique_coords = climate.coords.unique()
# Create an empty list to hold delayed tasks
delayed_results = []
# Parallelize the loop over unique_coords
for coords in unique_coords:
delayed_task = delayed(process_coords)(coords)
delayed_results.append(delayed_task)
# Compute all the tasks in parallel
df_flux = dd.from_delayed(delayed_results).compute()
# Do some things in the end
I know my code is not that efficient. I get this model output in 11 different files, because they are input in the coordinate specific computation in process_ccords where I calculate fluxes for a number of years. df_flux has 200 rows for each coordinate (40.000).
You example is a bit hard to read. I’m not sure of what your real input data is. Not sure why you are storing dask dataframe into a dict all_data, and it seems you are passing this dict to all delayed?
I’m really not sure what you are trying to achieve, and I would need some simplification of your code to understand. Perhaps what you will do if it wasn’t distributed or big data?