I have some code that creates an xarray dataset of dask delayed data arrays corresponding to different spatial variables (e.g, temperature, humidity) with time. The data arrays are all delayed as they are loaded off disk and processed during the load in a way that is very computationally expensive and memory intensive (I’m regridding a triangulation to grid).
One of the use cases of this code is to process all these delayed dataarrays – for each time and variable – and output to disk. Because I cannot keep the entirely of the dataset in memory at once, I want to do this processing with dask workers where the worker computes the datarray, dumps it to disk, and then discards the dataarray.
Essentially I want xarray.map_blocks without the in-memory persist.
I thought I could do this by iterating over the variables and time combinations in the xarray dataset and creating a list of dask.delayed to disk evaluations.
This “works” but not in the way I expected. What I’m seeing is that most/all of the data arrays are computed before the write to disk part is called. Thus I run out of memory.
I realize this approach is double-delayed. I’m very much open to a better way of doing this.
I have a reproducer below that demonstrates the behaviour I’m seeing. The output shows that makearray
is called many more times before the output to disk is started. This behaviour on real world data causes me to run OOM.
from dask.distributed import Client, LocalCluster
import dask
import time
import xarray as xr
import dask.array as da
import numpy as np
size = 1000
ts = 50
@xr.register_dataset_accessor("chm")
class GeoAccessor:
def __init__(self, xarray_obj):
self._obj = xarray_obj
def to_raster(self):
def _dowork_toraster(df):
time.sleep(2)
print('_dowork_toraster ... ' + str(df.values[0][0:5]))
work = []
for t in range(0, ts):
for v in ["test_var"]:
df = self._obj.isel(concat_dim=t)[v].copy()
task = dask.delayed(_dowork_toraster)(df)
work.append(task)
dask.compute(*work)
@dask.delayed
def makearray():
print('makearray')
df = da.from_array(np.random.random((size, size)))
return df
if __name__ == '__main__':
cluster = LocalCluster(n_workers=5, processes=True, threads_per_worker=1)
client = Client(cluster)
work = []
for i in range(0, ts):
dly = makearray()
d = da.from_delayed(dly,
shape=(size, size),
dtype=np.dtype('float64'))
x_center = np.linspace(0,size,num=size)
y_center = np.linspace(0,size,num=size)
tmp = xr.DataArray(d,
name = str(i),
coords={'y':y_center,
'x':x_center},
dims=['x','y'])
work.append(tmp)
vars={}
vars['test_var'] =xr.concat(work, dim=[t for t in range(0, ts)])
df = xr.Dataset(data_vars=vars)
df = df.chunk({'concat_dim': 1, 'x':-1, 'y':-1})
print(df)
print(' ----------------- compute ----------------- ')
df.chm.to_raster()
print(' --------------- end compute --------------- ')