Hi,
I am trying to run a grid interpolation function on multiple netcdf files using the following code.
To do so, I’m mapping the function on a slurm cluster for each file individually. To my understanding, each instance of the function is independent so I tried collecting the results using the as_completed method to free up the memory on the workers once a future is finished. However, the workers are not freeing up the memory after returning the results. The unmanaged memory is accumulated (8Gb per execution) and is breaking down the workers.
My output result is a 1000x12 dask dataframe of floats, the final csv output (containing all the results) is less than 2 Mb.
Apart from the returned result all the other variables are not supposed to be kept in memory theoretically. I tried manually deleting the variables before the return statement and using gc.collect() but my attempts did not succeed.
I tried using the trim_memory function during the execution of the result collecting loop, and testing the MALLOC_TRIM_THRESHOLD_ variable but the unmanaged memory kept accumulating
Thanks!
import pandas as pd
import datetime as dt
from glob import glob
import sys, logging, os
from dask_jobqueue import SLURMCluster
from dask.distributed import Client, as_completed
import dask, time
import dask.dataframe as dd
import numpy as np
start = time.time()
static_parameters=(1,2,3,4)
mydir = "path_to_input_directory"
### Core function of the code
def extractor(input_file):
import netCDF4 as nc
from bisect import bisect
import pyproj
import geopandas as gpd
from shapely.geometry import Polygon
from scipy.interpolate import griddata
with nc.Dataset(input_file, 'r') as wrfout:
"""Some Computer heavy mapping calculation using the above functions, returns a dask dataframe of size (1000x12)"""
return result
if __name__ == '__main__':
logging.basicConfig(filename=os.path.join(os.getcwd(),"message.log"),
filemode='w',
format='%(asctime)s,%(msecs)d %(name)s %(levelname)s %(message)s',
datefmt='%H:%M:%S',
level=logging.DEBUG)
input_files = glob(my_dir)
logging.info("[!] Number of files located: %s",len(input_files))
cluster = SLURMCluster(queue='short',
cores=1,
processes=2,
memory=f'{32}GiB',
walltime="02:00:00",
log_directory=os.getcwd(),
local_directory="/tmp",
nanny=True)
cluster.scale(jobs=8)
logging.info(cluster.job_script())
logging.info(f'the dashboard link: {cluster.dashboard_link}')
with Client(cluster) as dask_client:
dask_client.get_versions(check=True)
dask_client.amm.start() # Activate the Memory Manager
dask_client.forward_logging()
lazy_results=[]
logging.info("[+] Launching the compute command")
lazy_results = dask_client.map(extractor,input_files)
lazy_results = as_completed(lazy_results, with_results=True)
Output_Data=[]
logging.info("[+] Collecting the compute results")
for future, result in lazy_results:
logging.info("[+] Recieved a result:")
logging.info(result)
Output_Data.append(result)
logging.info(f"Reported size of result: {dask.sizeof.sizeof(result)}")
logging.info(f"[-] Ending future {future}")
logging.info(sys.getsizeof(result))
future.release()
del future
Output_Data = dask.dataframe.concat(Output_Data)
Output_Data.to_csv("output-results.csv")
dask_client.shutdown()
dask_client.close()
cluster.close()
logging.info(f'> computation time: {time.time() - start}')