Hey there,
I am experiencing a memory leak when using @dask.delayed
to parallelise a function that crops an xarray.Array
with a polygon. Here is a MWE to reproduce it.
import rioxarray
import xarray as xr
import numpy as np
from tqdm import tqdm
from shapely.geometry import Polygon
import dask
from dask.diagnostics import ProgressBar
def create_random_triangles(num_polygons, bounds):
polygons = []
for _ in range(num_polygons):
points = np.random.rand(3, 2) * np.array(bounds)
polygon = Polygon(points)
polygons.append(polygon)
return polygons
def create_synthetic_dataset(size, num_vars):
data = {f'var{i}': (["x", "y"], np.random.rand(size, size)) for i in range(num_vars)}
coords = {'x': np.arange(size), 'y': np.arange(size)}
ds = xr.Dataset(data, coords=coords)
ds.rio.set_spatial_dims(x_dim="x", y_dim="y", inplace=True)
ds.rio.write_crs("epsg:4326", inplace=True)
return ds.to_array()
@dask.delayed
def process_gdf(pol, dataset):
with dataset.rio.clip([pol], dataset.rio.crs, all_touched=True) as cropped_dataset:
numpy_array = cropped_dataset.to_numpy()
return numpy_array
if __name__ == "__main__":
num_polygons = 100000
bounds = (100, 100)
dataset_size = 5000
num_vars = 20
polygons = create_random_triangles(num_polygons, bounds)
dataset = create_synthetic_dataset(dataset_size, num_vars)
# fast solution
lazy_results = []
for pol in polygons:
lazy_results.append(process_gdf(pol, dataset))
with ProgressBar():
env_pred = dask.compute(*lazy_results)
The code works just fine, but memory usage blows up over time, until the script crashes. I have initially created a topic on the xarray forum, but I think that it is a Dask issue. Any idea on what am I doing wrong here?