Dask.distributed configurations benchmark

Hello,

I would like to right a short benchmark script to test for various Dask.distributed Client configurations, but I don’t get the expected results as cache and memory data do not seem to be cleared properly.

I’d like to resample 1/2 hourly to hourly data as a test :

# Perform the resampling computation
imerg_resampled = imerg_sliced.resample(time='1H').sum(method="blockwise").compute()

For this process, I’d like to test several Client configurations :

# Test configurations
configs = [
    {'n_workers': 8, 'threads_per_worker': 1, 'memory_limit': '1.5GiB'},
    {'n_workers': 6, 'threads_per_worker': 2, 'memory_limit': '2GiB'},
    {'n_workers': 4, 'threads_per_worker': 2, 'memory_limit': '3GiB'},
    {'n_workers': 10, 'threads_per_worker': 1, 'memory_limit': '1.2GiB'},
    {'n_workers': 5, 'threads_per_worker': 2, 'memory_limit': '2.4GiB'}

The problem is, despite I use some GC functions :

# Clear memory and garbage collect
del imerg_resampled, imerg_sliced, imerg
client.shutdown()  # Fully close Dask client to avoid lingering processes
client.close()
gc.collect()  # Force garbage collection to free memory

It does not look like the cache and memory are properly cleared :

See Full Script Here
import os
import xarray as xr
import numpy as np
import matplotlib.pyplot as plt
from dask.distributed import Client
import time
import gc  # for garbage collection

# Define the IMERG dataset path and file pattern
imerg_path = '/media/pmauger/LaCie/01_DATA/IMERG'
imerg_files = os.path.join(imerg_path, "*.nc4")

# Load WRF latitude and longitude vectors and create 2D meshgrid
wrf_path = "/media/pmauger/LaCie/01_DATA/WRF/V2"
lat_lon_d02 = np.load(os.path.join(wrf_path, "lat_lon_d02.npz"))
latitude_norm = lat_lon_d02['latitude']
longitude_norm = lat_lon_d02['longitude']

# Define spatial boundaries
lat_min, lat_max = np.min(latitude_norm) - 0.1, np.max(latitude_norm) + 0.1
lon_min, lon_max = np.min(longitude_norm) - 0.1, np.max(longitude_norm) + 0.1

# Test configurations
configs = [
    {'n_workers': 8, 'threads_per_worker': 1, 'memory_limit': '1.5GiB'},
    {'n_workers': 6, 'threads_per_worker': 2, 'memory_limit': '2GiB'},
    {'n_workers': 4, 'threads_per_worker': 2, 'memory_limit': '3GiB'},
    {'n_workers': 10, 'threads_per_worker': 1, 'memory_limit': '1.2GiB'},
    {'n_workers': 5, 'threads_per_worker': 2, 'memory_limit': '2.4GiB'}
]

# Results storage
times = []
configs_used = []

# Loop through configurations
for config in configs:
    print(f"\nTesting configuration: {config}")
    
    # Initialize Dask client with specified configuration
    client = Client(
        n_workers=config['n_workers'],
        threads_per_worker=config['threads_per_worker'],
        memory_limit=config['memory_limit']
    )
    
    # Force computation to load data
    imerg = xr.open_mfdataset(imerg_files, chunks={'time': 40}, data_vars="minimal")[['precipitationCal']]
    
    # Slice the IMERG dataset
    imerg_sliced = imerg.where(
        (imerg.lat >= lat_min) & (imerg.lat <= lat_max) &
        (imerg.lon >= lon_min) & (imerg.lon <= lon_max),
        drop=True
    )
    
    # Select a 6-month time range
    time_slice = slice("2002-01-01", "2002-01-31")
    imerg_sliced = imerg_sliced.sel(time=time_slice)

    # Print dataset info to check the size
    print("Dataset info before computation:")
    # print(imerg_sliced)

    # Start timing
    start_time = time.time()  

    print("Data loaded, performing resampling...")

    # Save the resampled dataset to a file
    resampled_filename = f"resampled_data_{config['n_workers']}_{config['threads_per_worker']}.nc"
    imerg_resampled.to_netcdf(resampled_filename)
    print(f"Resampled data saved to {resampled_filename}")

    end_time = time.time()  # End timing
    
    # Store the configuration and time taken
    elapsed_time = end_time - start_time
    times.append(elapsed_time)
    configs_used.append(config)
    
    print(f"Configuration {config} took {elapsed_time:.2f} seconds")
    
    # Clear memory and garbage collect
    del imerg_resampled, imerg_sliced, imerg
    client.shutdown()  # Fully close Dask client to avoid lingering processes
    client.close()
    gc.collect()  # Force garbage collection to free memory

# Plot the resublushlts
n_workers = [cfg['n_workers'] for cfg in configs_used]
threads_per_worker = [cfg['threads_per_worker'] for cfg in configs_used]
memory_limits = [cfg['memory_limit'] for cfg in configs_used]

plt.figure(figsize=(12, 6))
plt.bar(range(len(times)), times, color='skyblue')
plt.xticks(range(len(times)), 
           [f"Workers: {n}, Threads: {t}, Mem: {m}" 
            for n, t, m in zip(n_workers, threads_per_worker, memory_limits)], 
           rotation=45, ha='right')
plt.ylabel("Computation Time (s)")
plt.title("Dask Resampling Performance for Different Configurations")
plt.tight_layout()
plt.show()

How can I manage to effectively clear cache and memory and start each configuration in the loop without re-using any data to really compare runtime of different configurations ?

Thanks for your help :pray:

Hi @Arty,

I’m not sure of what could explain this difference between first test and others, at least on Dask side. It seems to me you properly delete the objects and LocalCluster, so there is no reason for the computation to be so fast after the first try.

Maybe it’s a cache effect on hard drive and memory side, and your computation is IO bound?

Thanks Guillaume. I’ll put that aside. That was just a simple performance test for optimization ; and I really don’t have any clue on how to correct that.

1 Like