Hi all,
We have a small-scaled HPC cluster on GCP. We use Slurm as a job queue and Lustre as a parallel file system. Since the data we work with is too big, we first split the input data into smaller batches and run these batches. The results reside in their corresponding batch folders. After each batch is finished running, we merge the outputs.
The problem we are facing right now is having a total of 5 TB of data waiting to be merged. We have different output files but to give you a perspective, the total size of an ordinary output file is around 70GB. This is my code so far:
import dask
import xarray as xr
import os
import dask.delayed
from dask_jobqueue import SLURMCluster
from dask.distributed import Client
cluster = SLURMCluster(queue="compute", interface="ens4", cores=4, memory="10GB", python="/home/dteber_woodwellclimate_org/dask_ws/.venv/bin/python3", walltime="06:00:00")
cluster.scale(100)
client = Client(cluster)
import dask.delayed
delayed_objs = []
multiple_ds = xr.open_mfdataset(f"/mnt/exacloud/dteber_woodwellclimate_org/full-iem/batch_*/output/{file}", chunks={'X': 100}, engine="h5netcdf", combine='nested', concat_dim='y', parallel=True)
multiple_ds = dask.delayed(multiple_ds)
write_obj = multiple_ds.to_netcdf("/mnt/exacloud/dteber_woodwellclimate_org/full-iem/all_merged/another_test.nc", engine="h5netcdf")
delayed_objs.append(write_obj)
dask.compute(*delayed_objs)
This code fails with the following error:
---------------------------------------------------------------------------
MemoryError Traceback (most recent call last)
Cell In[7], line 11
8 write_obj = multiple_ds.to_netcdf("/mnt/exacloud/dteber_woodwellclimate_org/full-iem/all_merged/another_test.nc", engine="h5netcdf")
9 delayed_objs.append(write_obj)
---> 11 dask.compute(*delayed_objs)
File ~/dask_ws/.venv/lib/python3.11/site-packages/dask/base.py:660, in compute(traverse, optimize_graph, scheduler, get, *args, **kwargs)
657 postcomputes.append(x.__dask_postcompute__())
659 with shorten_traceback():
--> 660 results = schedule(dsk, keys, **kwargs)
662 return repack([f(r, *a) for r, (f, a) in zip(results, postcomputes)])
File ~/dask_ws/.venv/lib/python3.11/site-packages/distributed/client.py:2417, in Client._gather(self, futures, errors, direct, local_worker)
2415 exception = st.exception
2416 traceback = st.traceback
-> 2417 raise exception.with_traceback(traceback)
2418 if errors == "skip":
2419 bad_keys.add(key)
MemoryError: Task 'finalize-31e15f59-596c-42cf-8c20-23f1dda040aa' has 48.69 GiB worth of input dependencies, but worker tcp://10.0.0.134:42489 has memory_limit set to 2.33 GiB. It seems like you called client.compute() on a huge collection. Consider writing to distributed storage or slicing/reducing first.
In this specific example, it tries to write a file that is approximately 66GB. This finalize task is the last task I see in the task graph. It finalizes the writing operation. However, I was expecting this write operation to be done by all of the task workers. It seems this writing operation is done by a single worker. Since there is no available memory for any worker, it complains and throws an error.
When I comment out multiple_ds = dask.delayed(multiple_ds)
, the operation takes a lot of time but completes without any errors. It takes 13 minutes to merge 49GB worth of data. Additionally, the task stream moves very slowly and one worker takes many more tasks than the other workers:
I tried the same operation using zarr but got the same error. Is it possible to distribute the work of writing to multiple workers? If it’s not possible, can you give me pointers to merge these big files?