Scheduling issues on SLURMCluster for transforming large arrays

Hi,

I’m trying to rechunk a large-ish dataset (a bit less than 20TB) and would like to use a SLURMCluster for this task.
I tried both, using rechunker and xarray.open_zarr(..., chunks={...}).to_zarr(..., encoding={x: {"chunks": (...)}). In any case, I ended up with dask chunks which are 256 MB or less and I’ve reserved about 2 GB per dask worker, so I’d expect this to run fine. For the xarray case, I also ensured that each output chunk depends on a single input chunk only, so no complex inter-task dependencies.

When I run this operation on a LocalCluster, this works indeed smoothly (I mostly get about 300MB of memory usage per worker and never more than 1GB).

When I run exactly the same operation on a SLURMCluster, it crashes relatively quickly. What I’ve observed is, that in each case (xarray or rechunker), I mostly end up having two kinds of tasks: one for reading the data and another one for writing the data (roughly 100k of each). On the LocalCluster, the corresponding write-task is usually scheduled immediately after it’s read-task has finished, which keeps the memory usage low. On the SLURMCluster, many many read-tasks are scheduled upfront and it takes some time until the first write-tasks are scheduled. In many cases, the memory of a worker fills up completely before the first writes are scheduled and subsequently the worker gets killed or pauses indefinitely.
Write-tasks do get scheduled eventually, but I’ve got to increase my memory-reservation by about a 100-fold to have enough free space for the write-tasks to become scheduled. However sometimes these workers still crash.

I can imagine that this may because read-tasks are all ready while write-tasks are not ready and it takes a while until write tasks are sent over to the workers once they become ready. Is this (kind of) intended behaviour? Should it be different?

May also want to ask at https://discourse.pangeo.io/

Hi! Thanks for posting this question. I’m not experienced at all with SLURMCluster, but I have a few guesses. Unfortunately, I don’t think there’s a ton that Dask itself can do here, but I’m happy to be proven wrong :smiley:

Mainly, I wonder if there is some difference between LocalCluster and SLURMCluster. Just to clarify, are you running the LocalCluster on your local machine, or on a single node of the cluster? If you’re running LocalCluster on your local machine, I wonder if resource differences between the machines causes the scheduler to behave differently.

Do you have the same number of workers between the LocalCluster case and the SLURMCluster case? If there are a lot more workers on the SLURM side, that may cause the scheduler to become a bottleneck in terms of scheduling tasks.

Anyhow, I don’t know that any of that helps, but I would second the suggestion to ask in a domain-specific forum just in case someone there has experience with a similar dataset size. If you want to post here an MVCE to help us start to reproduce this, that’d be great. Obviously, sharing 20TB of data is tough, but maybe there’s a more minimal case that can reproduce the problem?

1 Like

As @raybellwaves mentions, there are some discussions on the Pangeo community about this:

And some issues in distributed, mainly:

Your case seems really simple, so I second the idea to provide a MVCE. Dask should definitly be more aware of its workers memory and not read data if workers memory is full. This is a bit strange the LocalCluster achieves this, and not big distributed ones.

1 Like

Thanks for all the replies and the pointers to github and pangeo, I’ll dig more through them.
I’ll also try to come up with an nice MVCE.

@bryanweber I did run the LocalCluster on the smae kind of machine as the workers within the SLURMCluster and I did configure each SLURM job the same way as the LocalCluster has been configured, thus having two slurm jobs would give twice the number of workers as compared to the LocalCluster. So this will create more load on the Scheduler, but I even had those problems with only a single single-threaded worker per job and only 3-4 jobs concurrently.

A MVCE might be described here:

1 Like

What I’ve observed is, that in each case (xarray or rechunker), I mostly end up having two kinds of tasks: one for reading the data and another one for writing the data

This sounds very much like the problem I was having. https://www.youtube.com/watch?v=ftlgOESINvo

See recent changes in distributed to address this Share your experiences with `worker-saturation` config to reduce memory usage · dask/distributed · Discussion #7128 · GitHub