Memory allocation always <= 4GiB for distributed SLURMCluster workers

Hi all,

Summary
I’m seeing some unexpected behavior on a slurm-managed HPC when I use a SLURMCluster instance to create dask workers with more than 4GB of memory. Each worker is successfully allocated and granted the resources I specify by the HPC, but dask isn’t recognizing the memory requested.

Set-up
Within python, I use dask_jobqueue to start a slurm cluster instance, which I pass to my client like so:

from dask_jobqueue import SLURMCluster
from dask.distributed import Client

cluster = SLURMCluster(
     cores=8,
     processes=1,
     queue='partition1, partition2',
     memory='2GB',
     walltime=6:00:00,
     job_extra=['--job-name="test"'],
     extra=["--lifetime", "55m", "--lifetime-stagger", "4m"]
  )

cluster.scale(4)
client = Client(cluster, timeout="1200s")
client.wait_for_workers()

Given this configuration, I would expect each worker to have just under 2GB of memory, and indeed that’s the case.

Issue
However, if I increase memory= beyond 4GB, I no longer create workers with the memory I’d expect. For instance, this set-up will create workers that all have only 4GB of memory, not 8:

cluster = SLURMCluster(
     cores=8,
     processes=1,
     queue='partition1, partition2',
     memory='8GB',
     walltime=6:00:00,
     job_extra=['--job-name="test"'],
     extra=["--lifetime", "55m", "--lifetime-stagger", "4m"]
  )

What I’ve tried
From my understanding of dask’s docs, this behavior suggests a hardware or other limit of 4GB that dask would default to when the requested amount isn’t satisfiable. However, this doesn’t appear to be the case, as dask requests and is granted a 8GB job. (I also double-checked with an HPC admin.)

  1. I can check the dask job script, which looks as I’d expect:
>>> cluster.job_script()
'#!/usr/bin/env bash\n\n#SBATCH -J dask-worker\n#SBATCH -p partition1, partition2\n#SBATCH -n 1\n#SBATCH --cpus-per-task=4\n#SBATCH --mem=30G\n#SBATCH -t 6:00:00\n#SBATCH --job-name="test"\n\n/path/to/bin/python -m distributed.cli.dask_worker tcp://10.19.6.41:36691 --nthreads 4 --memory-limit 7.45GiB --name dummy-name --nanny --death-timeout 60 --lifetime 55m --lifetime-stagger 4m --protocol tcp://\n'
  1. I can check the processes running on the node and see that the memory limit passed in argument is (~)8GB:
bash /var/spool/slurmd/job56791996/slurm_script
  └─python -m distributed.cli.dask_worker tcp://10.18.1.58:44853 --nthreads 4 --memory-limit 7.45GiB --name SLURMCluster-0 --nanny --death-timeout 60 --protocol tcp://
      ├─python -c from multiprocessing.resource_tracker import main;main(12)
      ├─python -c from multiprocessing.spawn import spawn_main; spawn_main(tracker_fd=13, pipe_handle=19) --multiprocessing-fork
      │   └─4*[{python}]
      └─4*[{python}]
  1. I can check the cgroup limits on that node, which are correctly at 8
sh02-08n62 $ cat /sys/fs/cgroup/memory/slurm/uid_297147/job_56791996/memory.limit_in_bytes
8589934592
  1. And, as expected, if I run this batch script outside of dask, I can create a dask worker with the resources that I’d expect:
$ /path/to/bin/python -m distributed.cli.dask_worker tcp://10.19.6.41:36691 --nthreads 4 --memory-limit 7.45GiB --name dummy-name --nanny --death-timeout 60 --lifetime 55m --lifetime-stagger 4m --protocol tcp://

2022-06-30 14:39:47,184 - distributed.worker - INFO -   Start worker at: tcp://171.66.103.162:44442
2022-06-30 14:39:47,184 - distributed.worker - INFO -     Listening to: tcp://171.66.103.162:44442
2022-06-30 14:39:47,184 - distributed.worker - INFO -     dashboard at:   171.66.103.162:38304
2022-06-30 14:39:47,184 - distributed.worker - INFO - Waiting to connect to:   tcp://10.19.6.41:36691
2022-06-30 14:39:47,184 - distributed.worker - INFO - -------------------------------------------------
2022-06-30 14:39:47,184 - distributed.worker - INFO -       Threads:             4
2022-06-30 14:39:47,184 - distributed.worker - INFO -        Memory:         7.45 GiB
2022-06-30 14:39:47,185 - distributed.worker - INFO -   Local Directory: /path/to/dask-worker-space/worker-ajlixlob
2022-06-30 14:39:47,185 - distributed.worker - INFO - -------------------------------------------------
2022-06-30 14:39:48,954 - distributed.worker - INFO -    Registered to:   tcp://10.19.6.41:36691
2022-06-30 14:39:48,954 - distributed.worker - INFO - -------------------------------------------------
2022-06-30 14:39:48,954 - distributed.core - INFO - Starting established connection

In contrast, every worker I start using a SLURMCluster instance when more than 4GB is requested looks like

...
2022-MM-DD HH:MM:SS,000 - distributed.worker - INFO -        Memory:         4.00 GiB

...

and, as expected, these 4GB workers fail with resource-intensive jobs.

Question

  • Anyone know what’s going on here? Why 4GB?
  • Any pointers for how I should be setting things up differently?
  • Since it seems like the correct resources are being requested for each worker, would a workaround be to change or disable a dask memory configuration?

You’re hitting this line:

which in turn goes to distributed/system.py at main · dask/distributed · GitHub

Please verify that MEMORY_LIMIT matches the actual memory:

client.run(lambda: (psutil.virtual_memory(), distributed.system.MEMORY_LIMIT))

If it doesn’t, you need to investigate cgroups and rlimit (as described in the code for system.py linked above).
If it does, you need to investigate whatever initialises the VMs.

1 Like

Thanks @crusaderky for pointing me to the relevant source :beers:

It looks like distributed.system.MEMORY_LIMIT is indeed at ~4GB, but I’m having trouble seeing why after going through the parsers you link:

import dask
from dask_jobqueue import SLURMCluster
from dask.distributed import Client
from distributed import system
import psutil
import resource
import sys

cluster = SLURMCluster(
    cores=8,
    processes=1,
    queue='partition1, partition2',
    memory='120GB',
    walltime='2:00:00',
    job_extra=['--job-name="test"']
)
cluster.scale(8)
client = Client(cluster, timeout="1200s")
client.wait_for_workers()

client
# <Client: 'tcp://10.19.6.48:34610' processes=0 threads=0, memory=0 B>

cluster
# SLURMCluster(beb6067b, 'tcp://10.19.6.48:34610', workers=0, threads=0, memory=0 B)

system.MEMORY_LIMIT
# 4294967296

psutil.virtual_memory()
# svmem(total=269888774144, available=246888714240, percent=8.5, used=19844993024, free=229023993856, active=9289097216, inactive=7393382400, buffers=860160, cached=21018927104, shared=458452992, slab=20230512640)

# ~~~~~
# From
# https://github.com/dask/distributed/blob/75f4635b05034eba890c90d2f829c3672f59e017/distributed/system.py#L23-L30
# ~~~~~

sys.platform
# 'linux'

with open("/sys/fs/cgroup/memory/memory.limit_in_bytes") as f:
    cgroups_limit = int(f.read())

cgroups_limit
# 9223372036854771712

limit = psutil.virtual_memory().total
limit = min(limit, cgroups_limit)

limit
# 269888774144

hard_limit = resource.getrlimit(resource.RLIMIT_RSS)[1]

hard_limit
# 4294967296

Am I missing something?

Edit: I see I’m bumping up against the RSS rlimit.

What would you advise doing here? I see that my issue largely overlaps this GitHub issue.

I don’t see overlap. Your problem is that the rlimit is less than the amount of memory you’re mounting and you need to have it increased. Something to figure out with whatever spun up the VMs.

The overlap is that on newer linux kernels, the cgroups has replaced rlimits, yet dask uses rlimits to determine a “hard limit.” For instance, on my slurm cluster with a >2.3 linux kernel, I can’t change the RSS rlimit with the slurm argument --propagate as was possible with older kernels.

In any case, since the nuances of the linked GitHub issue and its relevance are over my head:

  1. Could you please clarify what you mean “Something to figure out with whatever spun up the VMs.”
  2. Can I address this issue by passing an argument to the worker memory_limit to avoid it defaulting to system.MEMORY_LIMIT ? source here

I everyone. just chiming in for two things.

First, there is something I don’t understand: why the worker started with dask-jobqueue hits this limit of 4GB, and why it does’nt when you launch it manually? The limit seems to be directly put in dask.distributed code, so it should not matter if dask-jobqueue starts the worker, or if you do it manually with the same kind of reservation in Slurm.

Second, to clarify:

We’re in a HPC context here. So there is no VMs. The Slurm job scheduler just books a part of a node, if everything is fine (and configured as it looks from the messages above), it should create a cgroup container with the resources asked for (for example 8 cores and 8GB of RAM), and start the Dask worker should be started in this cgroup container.

Again, doing this with dask-jobqueue or by end should lead to the same result.

Hi @guillaumeeb thanks for chiming in. I’m also puzzled by the discrepancy between manually starting the worker and allowing dask-jobqueue to do it, but it does trace back the the RSS rlimit here.

I’m puzzled because, as indicated in my first post, the cgroup limits are correct, which as you note is the important part. Any insight as to why the hard limit exists in the code linked above?

You see the cgroup check above, I’d say it’s just a safe check, not sure how you can get over it.

I just like to understand why you go into this code with dask-jobqueue and not when running on command line, you should run into it in both cases. Unfortunatly I don’t have the time to look into this, sorry.

2 Likes