Memory leak with `@dask.delayed`

Hey there,
I am experiencing a memory leak when using @dask.delayed to parallelise a function that crops an xarray.Array with a polygon. Here is a MWE to reproduce it.


import rioxarray
import xarray as xr
import numpy as np
from tqdm import tqdm
from shapely.geometry import Polygon
import dask
from dask.diagnostics import ProgressBar

def create_random_triangles(num_polygons, bounds):
    polygons = []
    for _ in range(num_polygons):
        points = np.random.rand(3, 2) * np.array(bounds)
        polygon = Polygon(points)
        polygons.append(polygon)
    return polygons

def create_synthetic_dataset(size, num_vars):
    data = {f'var{i}': (["x", "y"], np.random.rand(size, size)) for i in range(num_vars)}
    coords = {'x': np.arange(size), 'y': np.arange(size)}
    ds = xr.Dataset(data, coords=coords)
    ds.rio.set_spatial_dims(x_dim="x", y_dim="y", inplace=True)
    ds.rio.write_crs("epsg:4326", inplace=True)
    return ds.to_array()

@dask.delayed
def process_gdf(pol, dataset):
    with dataset.rio.clip([pol], dataset.rio.crs, all_touched=True) as cropped_dataset:
        numpy_array = cropped_dataset.to_numpy()
    return numpy_array

if __name__ == "__main__":
    num_polygons = 100000
    bounds = (100, 100)
    dataset_size = 5000
    num_vars = 20
    
    polygons = create_random_triangles(num_polygons, bounds)
    dataset = create_synthetic_dataset(dataset_size, num_vars)

    # fast solution
    lazy_results = []
    for pol in polygons:
        lazy_results.append(process_gdf(pol, dataset))

    with ProgressBar():
        env_pred = dask.compute(*lazy_results)

The code works just fine, but memory usage blows up over time, until the script crashes. I have initially created a topic on the xarray forum, but I think that it is a Dask issue. Any idea on what am I doing wrong here?

Hi @vboussange,

How big is one numpy Array in output to process_gdf function?

Will the result of 100000 calls to this function fit in your local process memory?

Hi @guillaumeeb,
In the provided result, the shape of the numpy arrays in output to process_gdf is about (20,80,80). These calls should definitely fit within the local process memory, as a similar script that does not rely on dask.delayed seems to work.

In the code above, you are trying to store in client memory 100,000 of these arrays, roughly 12 GiB of data in float64 if I’m correct. Do you have enough memory?

Another thing you should try to do is either start a LocalCluster and look at the Dashboard, or use a memory profiler to see why your memory is blowing up.