Delayed dataframe computation

I have some code that creates an xarray dataset of dask delayed data arrays corresponding to different spatial variables (e.g, temperature, humidity) with time. The data arrays are all delayed as they are loaded off disk and processed during the load in a way that is very computationally expensive and memory intensive (I’m regridding a triangulation to grid).

One of the use cases of this code is to process all these delayed dataarrays – for each time and variable – and output to disk. Because I cannot keep the entirely of the dataset in memory at once, I want to do this processing with dask workers where the worker computes the datarray, dumps it to disk, and then discards the dataarray.

Essentially I want xarray.map_blocks without the in-memory persist.

I thought I could do this by iterating over the variables and time combinations in the xarray dataset and creating a list of dask.delayed to disk evaluations.

This “works” but not in the way I expected. What I’m seeing is that most/all of the data arrays are computed before the write to disk part is called. Thus I run out of memory.

I realize this approach is double-delayed. I’m very much open to a better way of doing this.

I have a reproducer below that demonstrates the behaviour I’m seeing. The output shows that makearray is called many more times before the output to disk is started. This behaviour on real world data causes me to run OOM.


from dask.distributed import Client, LocalCluster
import dask
import time
import xarray as xr
import dask.array as da
import numpy as np

size = 1000
ts = 50


@xr.register_dataset_accessor("chm")
class GeoAccessor:
    def __init__(self, xarray_obj):
        self._obj = xarray_obj

    def to_raster(self):
        def _dowork_toraster(df):
            time.sleep(2)
            print('_dowork_toraster ... ' + str(df.values[0][0:5]))

        work = []

        for t in range(0, ts):
            for v in ["test_var"]:
                df = self._obj.isel(concat_dim=t)[v].copy()
                task = dask.delayed(_dowork_toraster)(df)
                work.append(task)

        dask.compute(*work)


@dask.delayed
def makearray():
    print('makearray')
    df = da.from_array(np.random.random((size, size)))
    return df

if __name__ == '__main__':

    cluster = LocalCluster(n_workers=5, processes=True, threads_per_worker=1)
    client = Client(cluster)

    work = []
    for i in range(0, ts):
        dly = makearray()

        d = da.from_delayed(dly,
                            shape=(size, size),
                            dtype=np.dtype('float64'))


        x_center = np.linspace(0,size,num=size)
        y_center = np.linspace(0,size,num=size)

        tmp = xr.DataArray(d,
                            name = str(i),
                            coords={'y':y_center,
                                    'x':x_center},
                            dims=['x','y'])

        work.append(tmp)

    vars={}
    vars['test_var'] =xr.concat(work, dim=[t for t in range(0, ts)])
    df = xr.Dataset(data_vars=vars)
    df = df.chunk({'concat_dim': 1, 'x':-1, 'y':-1}) 

    print(df)
    print(' ----------------- compute ----------------- ')
    df.chm.to_raster()
    print(' --------------- end compute --------------- ')


Hi! Great question. I suspect you’re correct that delaying a delayed is one problem. However, I’m curious if there’s another way to approach this. How are the data stored on disk? Zarr, parquet, CSV, etc. Dask has a number of ways to load data from different formats, I wonder if another one of those might be appropriate here?

1 Like

Hi, Thanks for the thoughts.

The data are stored on disk in the Paraview unstructured grid vtu format. I therefore have to regrid those vtu files on the fly from the triangluar mesh to the structured grid of xarray. This is /fine/ as I ultimately output these regridded data as geospatial files (tiff).