Hi, I am a new dask user, so I wanted to post to this forum to see if anyone with more experience may be able to tell me if I am missing something here. This is my first time posting in a forum like this, so any suggestions are appreciated.
Dask version: 2022.2.0
netcdf4 version: 1.5.6
numpy version: 1.20.3
Task: Take a satellite image for a large domain and split it up into small images that I like to call patches. These patches are saved in netcdfs and additional data is added to them. This is run on a lot of images- I am using dask to run things in parallel to keep the computational time down.
Issue: When I run my existing code I have set client = Client(n_workers=8, threads_per_worker=1, memory_limit='5GB')
The memory on the workers themselves is reasonable for my task (< 1GB), but the main program memory (local memory?) climbs. It does not climb significantly, but for a large amount of iterations, this ends up making a difference. It climbs from about 1 GB to over 3+ GB. This ends up causing the workers to fail because they combine the local memory with their memory to reach the 5 GB limit. It seems like the workers or futures are saving something to the local memory, and I can not seem to stop it from accumulating.
What I have tried: Agressively delete variables. Implement a client.restart()
and then delete variables. Neither of these work.
I’ve tried to create a minimal reproducible example (see below). This is my first time doing something like this, so I am hoping it makes sense- it seems a little ridiculous having it save a bunch of random number matrices, but hopefully it makes sense? I can share my larger code if needed, but it does involve accessing data I can not share (on a remote server). The memory usage does not get as high as my full program since I creating pseudo arrays with random numbers and I have cut out a lot of processing. However, you can see the memory increase even in this example.
Does anyone have any suggestions on what I can do to avoid the local memory increasing?
import sys, os
from datetime import datetime, timedelta
import numpy as np
from netCDF4 import Dataset
from dask.distributed import Client
import random
os.environ['HDF5_USE_FILE_LOCKING']="FALSE"
def get_list_of_dates(year, month, day, hour, minutes):
dts = []
for h in hour:
for m in minutes:
dt = datetime(year, month, day, h, m)
dts.append(dt)
return dts
def get_pseudo_patch_info(num_patches): #gets information needed for patches in one datetime.
patch_info = []
all_regions = ['northwest', 'west', 'west_north_central', 'southwest', 'east_north_central', 'south', 'southeast',
'central', 'northeast', 'oconus_west', 'oconus_east', 'oconus_north', 'oconus_south', 'oconus_other']
for i in range(num_patches):
region = random.sample(all_regions, 1)[0]
center_lon = random.randint(-180, 180)
center_lat = random.randint(-90, 90)
percent_10C = random.randint(0, 100)
percent_no_cov = random.randint(0, 100)
info = [center_lon, center_lat, region, percent_10C, percent_no_cov]
patch_info.append(info)
return patch_info
def get_patch(patch_info, dt, rootoutdir): #use patch information to save the patch
center_lon = patch_info[0]
center_lat = patch_info[1]
region = patch_info[2]
percent_10C = patch_info[3]
percent_no_cov = patch_info[4]
NCoutdir = dt.strftime(os.path.join(rootoutdir, ('data_netcdfs/%Y/%m/%d/' + region + '/')))
if not os.path.exists(NCoutdir):
os.makedirs(NCoutdir)
sat_array = np.random.randint(200, size=(200, 200))
NCfilepath = os.path.join(NCoutdir, dt.strftime('%Y_%m_%d_%H_%M_%j_' + str(center_lon) + str(center_lat) + str(percent_10C) + str(percent_no_cov) + '.nc'))
root = Dataset(NCfilepath, 'w', format='NETCDF4')
root.description = 'Data Patch for Convective Initiation Model (CONUS)'
root.center_latitude = center_lat
root.center_longitude = center_lon
root.center_region = region
root.percent_60max_neg10C_greater30dBZ = percent_10C
root.percent_60max_neg10C_noCoverage = percent_no_cov
sat_group = root.createGroup('satellite')
sat_group.createDimension('y', sat_array.shape[0])
sat_group.createDimension('x', sat_array.shape[1])
sat_data = sat_group.createVariable('Dummy', 'float32', ('y', 'x'))
sat_data[:,:] = sat_array
root.close()
return NCfilepath
def add_to_nc(NCfilepath): #add additional data to the netcdf for the patch (i.e. add radar to satellite patch)
root = Dataset(NCfilepath, 'r+')
radar_array = np.random.randint(200, size=(200, 200))
radargroup = root.createGroup('radar')
radargroup.createDimension('y', radar_array.shape[0])
radargroup.createDimension('x', radar_array.shape[1])
rad = radargroup.createVariable('Dummy2', 'float32', ('y', 'x'))
rad[:, :] = radar_array
root.close()
return
if __name__ == "__main__":
client = Client(n_workers=8, threads_per_worker=1, memory_limit='5GB') # Call Dask Client
year = 2019
month = 6
day = 22
hour = np.arange(0, 23, 1)
minutes = np.arange(1, 56, 5)
dts = get_list_of_dates(year, month, day, hour, minutes)
rootoutdir = os.environ['PWD']+'/test/'
print(rootoutdir)
if not os.path.exists(rootoutdir):
os.makedirs(rootoutdir)
patch_info_dicts = {} #I create a dictionary where the key is the date and the item is a list of all the patches I want for that datetime.
for single_dt in dts:
num_patches = random.randint(1, 210)
dict_key = single_dt.strftime('%Y%m%d%H%M%S')
info = get_pseudo_patch_info(num_patches)
patch_info_dicts[dict_key] = info
del single_dt, info, dict_key
counter = 0
for dt in dts: #access the patches through the datetime key and collect the patches and all info needed.
print(counter, dt)
dict_key = dt.strftime('%Y%m%d%H%M%S')
patch_info = patch_info_dicts[dict_key]
patch_res = client.map(get_patch, patch_info, dt=dt, rootoutdir=rootoutdir)
radar_res = client.map(add_to_nc, patch_res)
final_res = client.gather(radar_res)
counter = counter + 1
del patch_info, patch_res, radar_res, final_res
The starting memory of the main program is 239324 (can only upload one image as a new user) and the final is below highlighted (others in the image portray memory of the 8 workers and/or pycharm, etc.):