Abstract Resources with dask_jobqueue and PBSCluster

Greetings all,

I’m working in an HPC environment (using dask_jobqueue with a PBSCluster) and am running into some severe memory issues with very large datasets. The tasks use more memory than the scheduler expects, resulting in the Workers being repeatedly paused / killed. With LocalCluster, I can keep this from happening by specifying “MEMORY” as an abstract resource (as documented here) and telling the scheduler how much memory each task I submit is expected to use. However, I haven’t been able to get abstract resources to work with my PBSCluster. Is there a known or documented way to specify worker Resources for a PBSCluster?

I’ve tried several different ways so far, but none of them seem to work. Approaches I’ve tried include using a context manager to set distributed.worker.resources.MEMORY, and setting an environment variable, e.g., DASK_DISTRIBUTED__WORKER__RESOURCES__MEMORY=10

Tasks that are submitted with a resource requirement are never executed since the scheduler can’t find any workers that can satisfy the resource requirements.

Below is a simple example, where I check to see what resources the worker reports as suggested in a previous Slack discussion (here).

from dask_jobqueue import PBSCluster
from dask.distributed import Client

def get_cluster_config(n_cores, max_memory, n_processes, job_prologue):

    cluster_config = {
        'cores':n_cores,                  
        'memory':f'{max_memory}GB',       
        'processes':n_processes,          
        'local_directory':'$WORKDIR/tmp', 
        'resource_spec':f'select=1:ncpus={n_cores}:mpiprocs={n_processes}', 
        'queue':'debug',
        'account':'ACCOUNT_NUM',
        'walltime':'00:05:59',
        'interface':'ib0',  # InfiniBand
        'job_script_prologue':job_prologue,
    }

    return cluster_config

if __name__ == '__main__':

    cores_avail, mem, n_workers, n_jobs = 48, 190, 4, 1
    job_prologue = ['export DASK_DISTRIBUTED__WORKER__RESOURCES__MEMORY=10']

    print('Getting cluster config')
    cluster_config = get_cluster_config(cores_avail, mem, n_workers, job_prologue)
    print(cluster_config)
    print()
    print('Starting cluster')

    with PBSCluster(**cluster_config) as cluster:

        cluster.scale(n_jobs)

        with Client(cluster) as client:

            print('Client started...checking resources...\n')
            print(f'Reported Worker Resources: {client.run(lambda dask_worker: dask_worker.total_resources)}\n')
            print('Attempting to run resource constrained task...\n')
            future = client.submit(sum, [1, 2, 3], resources={'MEMORY': 1})
            print(future)
            print(f'Total: {future.result()}')

When this sample code runs, it reports no worker resources, and the final task just hangs in pending status:

$ python test_resources.py
Getting cluster config
{'cores': 48, 'memory': '190GB', 'processes': 4, 'local_directory': '$WORKDIR/tmp', 'resource_spec': 'select=1:ncpus=48:mpiprocs=4', 'queue': 'debug', 'account': '', 'walltime': '00:05:59', 'interface': 'ib0', 'job_script_prologue': ['module load singularity', 'conda activate scr', 'export DASK_DISTRIBUTED__WORKER__RESOURCES__MEMORY=10']}

Starting cluster
Client started...checking resources...

Reported Worker Resources: {}

Attempting to run resource constrained task...

<Future: pending, key: sum-7d4ed656da9a03ac563e2ef09a9ea8fd>

Thanks for any help you can provide!

You should be able to specify Worker resources using worker_extra_args kwarg, this is a bit hidden in the documentation.

So something like:

cluster_config = {
        ...
        'job_script_prologue':job_prologue,
        'worker_extra_args':["--resources MEMORY=10"], # Define memory resources for each Worker
    }
1 Like

Thank you – that did the job perfectly.

You’re right that it was quite well hidden in the documentation!

1 Like