Placing limits on scheduler memory

Is there a way to limit the memory usage by dask scheduler? The API documentation does not mention any such feature.

This is especially crucial when running dask-jobqueue-pbscluster on a cluster. As described in the configuration documentation of dask-jobqueue, the scheduler runs on the head node of the cluster. When the scheduler takes up all the memory, it becomes a problem since that hinders the usage of the cluster for other users.

If there is no way to directly limit the memory of the scheduler, maybe there are other workarounds such as running the scheduler as a PBS job?

1 Like

Hi @Kostusas, welcome here!

As far as I know there isn’t. But I’ve never run into troubles with Scheduler memory. Do you see the scheduler using a lot of memory? Is you tasks graph very big or something?

Yes it’s often possible to start the Scheduler inside a job, as long as the HPC cluster configuration allows it, which is not always the case. Just be careful of walltime here.

For other solutions, you’ll need to as the support of sys-admins of your super computer, maybe using cgroups or such functionality. It’s also common to have memory limits placed for each user on super computers’ head nodes through cgroups by default.

Indeed, we see the scheduler take up >3GB of memory on the head node of the cluster. The task graph contains a couple of thousand of tasks which should not amount to such high memory usage.

Maybe you can advise on the best ways to troubleshoot the scheduler? For example, is there some way to estimate the memory usage before sending it to the scheduler?

Yep, that doesn’t sound like a huge graph. How are you reading input data? Can you give at least some code samples of what you are trying to do?

Maybe you can advise on the best ways to troubleshoot the scheduler? For example, is there some way to estimate the memory usage before sending it to the scheduler?

Could you just try to do a memory profile of the Scheduler during execution?

After reviewing the example, we realized that one of the functions contained in the graph had a reference to a big global variable, which was the likely source of excessive memory usage. Is that possible?

Dask now warns if some of the inputs have an excessive size; would it make sense to have the same warning if the functions themselves have excessive size?

I guess so, Dask will serialize function and everything that is needed to send the tasks to the worker. And it will keep all that on Scheduler side. It would be nice if you could reproduce this somehow.

Well, I think yes. Dask could maybe detect if a task graph is abnormally large, but I’m not sure how all this works under the hood. I would recommend building some reproducer, and open an issue on Dask or Distributed github repo if it is confirmed.

Here’s the minimal example we found:

import numpy as np
import dask.array
from time import sleep
from collections import defaultdict

N = 30
array_shape = 200

dictionary = {(key): np.random.rand(array_shape, array_shape) for key in range(N+1)}


def test_function(a):
    sleep(100)
    return np.sum(dictionary[int(a)])*dictionary[int(a)]


dask_array = dask.array.linspace(0, N, N+1, chunks=(1,))

mapped_array =  dask.array.apply_gufunc(
    test_function,
    "()->(l,m)",
    dask_array,
    vectorize=True,
    output_sizes={"l": array_shape, "m": array_shape},
    meta=dask.array.empty(
        shape=(N, array_shape, array_shape),
        chunks=(1, array_shape, array_shape),
        dtype="complex",
    ),
)

from dask.distributed import Client, LocalCluster
cluster = LocalCluster(n_workers = 31)
client = Client(cluster)

client.scatter(dictionary, broadcast=True)

mapped_array.compute()

cluster.close()

Monitoring the resource usage with top shows that the scheduler takes 500MB with this graph.

Thanks!

This is a too deep into Scheduler mechanisms for me to provide any more insights here.

Let’s see if @fjetter has some time to look at this problem.

Yes, and this is the common cause for scheduler memory blowup. In general the common advice here is to not load data locally, but instead let Dask load the data:

1 Like