Merging hundreds of NetCDF files into a single big NetCDF file on HPC Cluster

Hi all,

We have a small-scaled HPC cluster on GCP. We use Slurm as a job queue and Lustre as a parallel file system. Since the data we work with is too big, we first split the input data into smaller batches and run these batches. The results reside in their corresponding batch folders. After each batch is finished running, we merge the outputs.

The problem we are facing right now is having a total of 5 TB of data waiting to be merged. We have different output files but to give you a perspective, the total size of an ordinary output file is around 70GB. This is my code so far:

import dask
import xarray as xr
import os
import dask.delayed
from dask_jobqueue import SLURMCluster
from dask.distributed import Client

cluster = SLURMCluster(queue="compute", interface="ens4", cores=4, memory="10GB", python="/home/dteber_woodwellclimate_org/dask_ws/.venv/bin/python3", walltime="06:00:00")
cluster.scale(100)
client = Client(cluster)

import dask.delayed

delayed_objs = []

multiple_ds = xr.open_mfdataset(f"/mnt/exacloud/dteber_woodwellclimate_org/full-iem/batch_*/output/{file}", chunks={'X': 100}, engine="h5netcdf", combine='nested', concat_dim='y', parallel=True)
multiple_ds = dask.delayed(multiple_ds)
write_obj = multiple_ds.to_netcdf("/mnt/exacloud/dteber_woodwellclimate_org/full-iem/all_merged/another_test.nc", engine="h5netcdf")
delayed_objs.append(write_obj)

dask.compute(*delayed_objs)

This code fails with the following error:

---------------------------------------------------------------------------
MemoryError                               Traceback (most recent call last)
Cell In[7], line 11
      8 write_obj = multiple_ds.to_netcdf("/mnt/exacloud/dteber_woodwellclimate_org/full-iem/all_merged/another_test.nc", engine="h5netcdf")
      9 delayed_objs.append(write_obj)
---> 11 dask.compute(*delayed_objs)

File ~/dask_ws/.venv/lib/python3.11/site-packages/dask/base.py:660, in compute(traverse, optimize_graph, scheduler, get, *args, **kwargs)
    657     postcomputes.append(x.__dask_postcompute__())
    659 with shorten_traceback():
--> 660     results = schedule(dsk, keys, **kwargs)
    662 return repack([f(r, *a) for r, (f, a) in zip(results, postcomputes)])

File ~/dask_ws/.venv/lib/python3.11/site-packages/distributed/client.py:2417, in Client._gather(self, futures, errors, direct, local_worker)
   2415     exception = st.exception
   2416     traceback = st.traceback
-> 2417     raise exception.with_traceback(traceback)
   2418 if errors == "skip":
   2419     bad_keys.add(key)

MemoryError: Task 'finalize-31e15f59-596c-42cf-8c20-23f1dda040aa' has 48.69 GiB worth of input dependencies, but worker tcp://10.0.0.134:42489 has memory_limit set to 2.33 GiB. It seems like you called client.compute() on a huge collection. Consider writing to distributed storage or slicing/reducing first.

In this specific example, it tries to write a file that is approximately 66GB. This finalize task is the last task I see in the task graph. It finalizes the writing operation. However, I was expecting this write operation to be done by all of the task workers. It seems this writing operation is done by a single worker. Since there is no available memory for any worker, it complains and throws an error.

When I comment out multiple_ds = dask.delayed(multiple_ds), the operation takes a lot of time but completes without any errors. It takes 13 minutes to merge 49GB worth of data. Additionally, the task stream moves very slowly and one worker takes many more tasks than the other workers:

I tried the same operation using zarr but got the same error. Is it possible to distribute the work of writing to multiple workers? If it’s not possible, can you give me pointers to merge these big files?

Hi @dogukanteber, welcome to Dask community!

Yep, this line is useless and probably messes all up. You don’t need it. It takes time because Dask cannot write concurrently into a single NetCDF file. Only MPI-IO can do this. So the write are sequential, with some locking, and probably some overhead.

I recommend to use Zarr, and you should not get any error. You need to make sure inputs are chunked properly though, but it should become embarrassingly parallel.

Hi @guillaumeeb! Thank you for your warm welcome.

I managed to get the writing to Zarr format working. I feel like there is more to do to optimize the performance. I have 200 workers each with 4 cores and 20GB memory. The operation takes almost two hours to merge 6.4 TB worth of data with this code:

import gcsfs

fs = gcsfs.GCSFileSystem(project="<project-id>", token=None)

path = "/mnt/exacloud/dteber_woodwellclimate_org/full-iem/batch_0/output"
output_files = os.listdir(path)
output_files.remove("run_status.nc")
pattern = "/mnt/exacloud/dteber_woodwellclimate_org/full-iem/batch_*/output/{file}"

for file in output_files:
    gcsmap = gcsfs.mapping.GCSMap(f"<path>/{file}", gcs=fs, check=False, create=True)
    multiple_ds = xr.open_mfdataset(pattern.format(file=file), chunks="auto", engine="h5netcdf", combine='nested', concat_dim='y', parallel=True)
    delayed_obj = multiple_ds.to_zarr(store=gcsmap, compute=False)

    print(file)
    dask.compute(delayed_obj)

This is how the task stream looks like:

A couple of workers take the burden of processing while the others are barely working. What can I do to distribute the work evenly?

I am unsure what the ideal chunking would be because my files vary. I am also a little bit confused about chunking when writing to Zarr. I’d appreciate it if you give me pointers and one concrete example of chunking.

Are you looping on single files here? Or am I missunderstanding?

How much take a single loop execution?

Why are you saying a couple of Worker take the burden? Looking at the dashboard it seems several are pretty busy?

Yes, I’m looping in single files. I wanted to open a couple of files in one loop iteration but doing that crashed my session several times. So, I started looping in single files. I’m unsure if there’s a better way to write this loop.

The time for a single loop execution depends on the file size. I have four heavy files. They take 10-12 minutes to finish. The other files, on the other hand, take less than one minute.

The reason why I said a couple of workers take the burden is because I was expecting each worker to have a similar task count. In the dashboard that I provided, some workers seem to work more than others. I was trying to distribute the work as evenly as possible. Is that a reasonable thing to expect?

https://blog.dask.org/2021/11/02/choosing-dask-chunk-sizes

How big are these files. Which format? Are they chunked?

Where are you writing the result?

Task sheduling and Work stealing among Worker is a complex thing. But from the look of your Dashboard, everything looks fine, most Worker have 4 tasks, so one task per core, the task stream looks continuous with no white spaces between tasks or no empty line… So I’d said the work is distributed evenly.

Sorry for the late reply. I took a time off.

File sizes generally vary but they can go up to 100GB and they are chunked.

I’m writing the results to a GCP bucket.

It’s good to know that workers have a good load. I worried that I was under-utilizing the resources.

I thin you should try to profile your worklfow with a single big file to see where you have problem and which step takes most of the time. 10/12 minute to write 100GB is pretty long.

Thank you for your responses, @guillaumeeb. I improved my performance but still, there is room for improvement. I will dig deeper to see what’s the problem.