Hi all,
I have 12 NetCDF files with total 190 GB in size. Some of these files are too big so I decided to split them by Y-axis before feeding them into our model. In our current implementation, we split these big files into medium sized files and then split them again by Y-axis using NetCDF operators. For instance, initially we have files with X=2560 and Y=1850 dimensions. We then split these files into 10 separate input files so that each would have X=256 and Y=185 dimensions. Finally, the files are small enough to fit into the memory so we split them using NetCDF operators. In the end we’d have 1850 files with X=2560 and Y=1 dimensions. However, this approach is pretty time consuming, error-prone and not scalable. We want to re-implement this whole process using Dask.
I rewrote the same code using Dask but the performance got worse let alone improved and I got an error:
import dask
import os
import xarray as xr
from dask.distributed import LocalCluster, Client
from dask import delayed
from gcsfs import GCSFileSystem
@delayed
def process_y(ds, y):
subset = ds.isel({"Y": 0}).expand_dims("Y")
return subset.to_netcdf(f"temp/hist_cli_{y}.nc", engine="h5netcdf")
bucket_path = "bucket_path"
file = "file.zarr"
cluster = LocalCluster(n_workers=os.cpu_count())
client = Client(cluster)
fs = GCSFileSystem(project="project_id")
bucket_mapping = fs.get_mapper(os.path.join(bucket_path, file), check=True)
ds = xr.open_zarr(bucket_mapping, chunks={"Y": -1, "X": "auto"}).persist()
y_dim = ds.Y.size
objs = []
for y in range(y_dim):
objs.append(process_y(ds, y))
dask.compute(*objs)
This code doesn’t work for big input files and is slower than using NetCDF operators. It fails by saying:
KilledWorker: Attempted to run task 'finalize-139954c5-01c5-424f-9a09-aedbc696b38c' on 4 different workers, but all those workers died while running it. The last worker that attempt to run the task was tcp://127.0.0.1:43191. Inspecting worker logs is often a good next step to diagnose what went wrong. For more information see https://distributed.dask.org/en/stable/killed.html.
The data that’s being written is collected on a single worker at the end (the task finalize) and since my worker doesn’t have that much memory the code fails.
The issue I’m facing is probably an xarray problem rather than Dask but I’m wondering if there is a workaround to this problem. I want to slice my data into single columns so that I can process them as batch jobs later on. Can you give me pointers on how to optimize my code and get rid of this error? I’m open to implement different strategies that gives the same result as long as they are performant.
Thank you,