Splitting big NetCDF file into hundreds of smaller files

Hi all,

I have 12 NetCDF files with total 190 GB in size. Some of these files are too big so I decided to split them by Y-axis before feeding them into our model. In our current implementation, we split these big files into medium sized files and then split them again by Y-axis using NetCDF operators. For instance, initially we have files with X=2560 and Y=1850 dimensions. We then split these files into 10 separate input files so that each would have X=256 and Y=185 dimensions. Finally, the files are small enough to fit into the memory so we split them using NetCDF operators. In the end we’d have 1850 files with X=2560 and Y=1 dimensions. However, this approach is pretty time consuming, error-prone and not scalable. We want to re-implement this whole process using Dask.

I rewrote the same code using Dask but the performance got worse let alone improved and I got an error:

import dask
import os
import xarray as xr

from dask.distributed import LocalCluster, Client
from dask import delayed
from gcsfs import GCSFileSystem

@delayed
def process_y(ds, y):
    subset = ds.isel({"Y": 0}).expand_dims("Y")
    return subset.to_netcdf(f"temp/hist_cli_{y}.nc", engine="h5netcdf")

bucket_path = "bucket_path"
file = "file.zarr"

cluster = LocalCluster(n_workers=os.cpu_count())
client = Client(cluster)
fs = GCSFileSystem(project="project_id")
bucket_mapping = fs.get_mapper(os.path.join(bucket_path, file), check=True)

ds = xr.open_zarr(bucket_mapping, chunks={"Y": -1, "X": "auto"}).persist()
y_dim = ds.Y.size

objs = []
for y in range(y_dim):
    objs.append(process_y(ds, y))

dask.compute(*objs)

This code doesn’t work for big input files and is slower than using NetCDF operators. It fails by saying:

KilledWorker: Attempted to run task 'finalize-139954c5-01c5-424f-9a09-aedbc696b38c' on 4 different workers, but all those workers died while running it. The last worker that attempt to run the task was tcp://127.0.0.1:43191. Inspecting worker logs is often a good next step to diagnose what went wrong. For more information see https://distributed.dask.org/en/stable/killed.html.

The data that’s being written is collected on a single worker at the end (the task finalize) and since my worker doesn’t have that much memory the code fails.

The issue I’m facing is probably an xarray problem rather than Dask but I’m wondering if there is a workaround to this problem. I want to slice my data into single columns so that I can process them as batch jobs later on. Can you give me pointers on how to optimize my code and get rid of this error? I’m open to implement different strategies that gives the same result as long as they are performant.

Thank you,

Hi @dogukanteber

So just to understand, in your current workflow, you split files once, then split and merge afterwards? What tool are you using, and why aren’t you doing this in only one step?

I see you are using Zarr here, does this work with initial NetCDF files? Why are you persisting into memory? Do you really need to write in NetCDF, could you write final output with Zarr?

I think you should take a look at rechuncker and kerchunk libraries, maybe this would help solve your problem. Kerchunk alone might work if your NetCDF files are chunked, but rechunling them might prove more performant in the end.