Unmanaged memory high even after future collection

Hi,
I am trying to run a grid interpolation function on multiple netcdf files using the following code.
To do so, I’m mapping the function on a slurm cluster for each file individually. To my understanding, each instance of the function is independent so I tried collecting the results using the as_completed method to free up the memory on the workers once a future is finished. However, the workers are not freeing up the memory after returning the results. The unmanaged memory is accumulated (8Gb per execution) and is breaking down the workers.

My output result is a 1000x12 dask dataframe of floats, the final csv output (containing all the results) is less than 2 Mb.
Apart from the returned result all the other variables are not supposed to be kept in memory theoretically. I tried manually deleting the variables before the return statement and using gc.collect() but my attempts did not succeed.
I tried using the trim_memory function during the execution of the result collecting loop, and testing the MALLOC_TRIM_THRESHOLD_ variable but the unmanaged memory kept accumulating

Thanks!


import pandas as pd
import datetime as dt
from glob import glob
import sys, logging, os
from dask_jobqueue  import SLURMCluster
from dask.distributed import Client, as_completed
import dask, time
import dask.dataframe as dd
import numpy as np

start = time.time()
static_parameters=(1,2,3,4)
mydir = "path_to_input_directory"
### Core function of the code
def extractor(input_file):
    import netCDF4 as nc
    from bisect import bisect
    import pyproj
    import geopandas as gpd
    from shapely.geometry import Polygon
    from scipy.interpolate import griddata
    with nc.Dataset(input_file, 'r') as wrfout:
        """Some Computer heavy mapping calculation using the above functions, returns a dask dataframe of size (1000x12)"""
    return result

if __name__ == '__main__':
    logging.basicConfig(filename=os.path.join(os.getcwd(),"message.log"),
                    filemode='w',
                    format='%(asctime)s,%(msecs)d %(name)s %(levelname)s %(message)s',
                    datefmt='%H:%M:%S',
                    level=logging.DEBUG)
    input_files = glob(my_dir)
    logging.info("[!] Number of files located: %s",len(input_files))
    
    cluster = SLURMCluster(queue='short',
                           cores=1, 
                           processes=2,
                           memory=f'{32}GiB', 
                           walltime="02:00:00", 
                           log_directory=os.getcwd(),
                           local_directory="/tmp",
                           nanny=True)
    cluster.scale(jobs=8) 
    logging.info(cluster.job_script())
    logging.info(f'the dashboard link: {cluster.dashboard_link}')
    with Client(cluster) as dask_client:
        dask_client.get_versions(check=True)
        dask_client.amm.start() # Activate the Memory Manager
        dask_client.forward_logging()
        lazy_results=[]
        logging.info("[+] Launching the compute command")
        lazy_results = dask_client.map(extractor,input_files)
        lazy_results = as_completed(lazy_results, with_results=True)
        Output_Data=[]
        logging.info("[+] Collecting the compute results")
        for future, result in lazy_results:
            logging.info("[+] Recieved a result:")
            logging.info(result)
            Output_Data.append(result)
            logging.info(f"Reported size of result: {dask.sizeof.sizeof(result)}")
            logging.info(f"[-] Ending future {future}")
            logging.info(sys.getsizeof(result))
            future.release()
            del future           
        Output_Data = dask.dataframe.concat(Output_Data)
        Output_Data.to_csv("output-results.csv")
        dask_client.shutdown()
        dask_client.close()
    cluster.close()
    logging.info(f'> computation time: {time.time() - start}')

Hi @C130PA5, welcome to Dask community!

If the results of your extractor function is small enough, you shouldn’t even have to release them or use as_completed.

The only problem I can imagine is memory not beeing freed by the nc library, or in the code in the with statement. What happens when you profile a single execution of the extractor function? Or try to execute it on several files on a single Python process without Dask?

Hi @guillaumeeb, Thank you for the assistance. I tested my code both single-threaded without any additional libraries and multi-threaded using the multiprocessing.Pool library. The code keeps accumulating memory storage. Clearly, it is not a memory issue from Dask so I’ll close the issue. The issue seems to be coming from the interpolation modules. I tried to print out some diagnostic outputs during the execution using the resources module and the Pool.map function:

1st Iteration: Memory before opening netcdf file: 167.50 MB
1st Iteration: Memory before interpolation: 2315.30 MB
1st Iteration: Memory after interpolation: 11649.70 MB
1st Iteration: Memory after garbage collect: 11649.70 MB (<- I tried to manually delete all unwanted variables inside the function)

2nd Iteration: Memory before opening netcdf file: 11649.70 MB

1 Like