Hello,
I would like to right a short benchmark script to test for various Dask.distributed Client configurations, but I don’t get the expected results as cache and memory data do not seem to be cleared properly.
I’d like to resample 1/2 hourly to hourly data as a test :
# Perform the resampling computation
imerg_resampled = imerg_sliced.resample(time='1H').sum(method="blockwise").compute()
For this process, I’d like to test several Client configurations :
# Test configurations
configs = [
{'n_workers': 8, 'threads_per_worker': 1, 'memory_limit': '1.5GiB'},
{'n_workers': 6, 'threads_per_worker': 2, 'memory_limit': '2GiB'},
{'n_workers': 4, 'threads_per_worker': 2, 'memory_limit': '3GiB'},
{'n_workers': 10, 'threads_per_worker': 1, 'memory_limit': '1.2GiB'},
{'n_workers': 5, 'threads_per_worker': 2, 'memory_limit': '2.4GiB'}
The problem is, despite I use some GC functions :
# Clear memory and garbage collect
del imerg_resampled, imerg_sliced, imerg
client.shutdown() # Fully close Dask client to avoid lingering processes
client.close()
gc.collect() # Force garbage collection to free memory
It does not look like the cache and memory are properly cleared :
See Full Script Here
import os
import xarray as xr
import numpy as np
import matplotlib.pyplot as plt
from dask.distributed import Client
import time
import gc # for garbage collection
# Define the IMERG dataset path and file pattern
imerg_path = '/media/pmauger/LaCie/01_DATA/IMERG'
imerg_files = os.path.join(imerg_path, "*.nc4")
# Load WRF latitude and longitude vectors and create 2D meshgrid
wrf_path = "/media/pmauger/LaCie/01_DATA/WRF/V2"
lat_lon_d02 = np.load(os.path.join(wrf_path, "lat_lon_d02.npz"))
latitude_norm = lat_lon_d02['latitude']
longitude_norm = lat_lon_d02['longitude']
# Define spatial boundaries
lat_min, lat_max = np.min(latitude_norm) - 0.1, np.max(latitude_norm) + 0.1
lon_min, lon_max = np.min(longitude_norm) - 0.1, np.max(longitude_norm) + 0.1
# Test configurations
configs = [
{'n_workers': 8, 'threads_per_worker': 1, 'memory_limit': '1.5GiB'},
{'n_workers': 6, 'threads_per_worker': 2, 'memory_limit': '2GiB'},
{'n_workers': 4, 'threads_per_worker': 2, 'memory_limit': '3GiB'},
{'n_workers': 10, 'threads_per_worker': 1, 'memory_limit': '1.2GiB'},
{'n_workers': 5, 'threads_per_worker': 2, 'memory_limit': '2.4GiB'}
]
# Results storage
times = []
configs_used = []
# Loop through configurations
for config in configs:
print(f"\nTesting configuration: {config}")
# Initialize Dask client with specified configuration
client = Client(
n_workers=config['n_workers'],
threads_per_worker=config['threads_per_worker'],
memory_limit=config['memory_limit']
)
# Force computation to load data
imerg = xr.open_mfdataset(imerg_files, chunks={'time': 40}, data_vars="minimal")[['precipitationCal']]
# Slice the IMERG dataset
imerg_sliced = imerg.where(
(imerg.lat >= lat_min) & (imerg.lat <= lat_max) &
(imerg.lon >= lon_min) & (imerg.lon <= lon_max),
drop=True
)
# Select a 6-month time range
time_slice = slice("2002-01-01", "2002-01-31")
imerg_sliced = imerg_sliced.sel(time=time_slice)
# Print dataset info to check the size
print("Dataset info before computation:")
# print(imerg_sliced)
# Start timing
start_time = time.time()
print("Data loaded, performing resampling...")
# Save the resampled dataset to a file
resampled_filename = f"resampled_data_{config['n_workers']}_{config['threads_per_worker']}.nc"
imerg_resampled.to_netcdf(resampled_filename)
print(f"Resampled data saved to {resampled_filename}")
end_time = time.time() # End timing
# Store the configuration and time taken
elapsed_time = end_time - start_time
times.append(elapsed_time)
configs_used.append(config)
print(f"Configuration {config} took {elapsed_time:.2f} seconds")
# Clear memory and garbage collect
del imerg_resampled, imerg_sliced, imerg
client.shutdown() # Fully close Dask client to avoid lingering processes
client.close()
gc.collect() # Force garbage collection to free memory
# Plot the resublushlts
n_workers = [cfg['n_workers'] for cfg in configs_used]
threads_per_worker = [cfg['threads_per_worker'] for cfg in configs_used]
memory_limits = [cfg['memory_limit'] for cfg in configs_used]
plt.figure(figsize=(12, 6))
plt.bar(range(len(times)), times, color='skyblue')
plt.xticks(range(len(times)),
[f"Workers: {n}, Threads: {t}, Mem: {m}"
for n, t, m in zip(n_workers, threads_per_worker, memory_limits)],
rotation=45, ha='right')
plt.ylabel("Computation Time (s)")
plt.title("Dask Resampling Performance for Different Configurations")
plt.tight_layout()
plt.show()
How can I manage to effectively clear cache and memory and start each configuration in the loop without re-using any data to really compare runtime of different configurations ?
Thanks for your help