Greetings all,
I’m working in an HPC environment (using dask_jobqueue with a PBSCluster) and am running into some severe memory issues with very large datasets. The tasks use more memory than the scheduler expects, resulting in the Workers being repeatedly paused / killed. With LocalCluster, I can keep this from happening by specifying “MEMORY” as an abstract resource (as documented here) and telling the scheduler how much memory each task I submit is expected to use. However, I haven’t been able to get abstract resources to work with my PBSCluster. Is there a known or documented way to specify worker Resources for a PBSCluster?
I’ve tried several different ways so far, but none of them seem to work. Approaches I’ve tried include using a context manager to set distributed.worker.resources.MEMORY, and setting an environment variable, e.g., DASK_DISTRIBUTED__WORKER__RESOURCES__MEMORY=10
Tasks that are submitted with a resource requirement are never executed since the scheduler can’t find any workers that can satisfy the resource requirements.
Below is a simple example, where I check to see what resources the worker reports as suggested in a previous Slack discussion (here).
from dask_jobqueue import PBSCluster
from dask.distributed import Client
def get_cluster_config(n_cores, max_memory, n_processes, job_prologue):
cluster_config = {
'cores':n_cores,
'memory':f'{max_memory}GB',
'processes':n_processes,
'local_directory':'$WORKDIR/tmp',
'resource_spec':f'select=1:ncpus={n_cores}:mpiprocs={n_processes}',
'queue':'debug',
'account':'ACCOUNT_NUM',
'walltime':'00:05:59',
'interface':'ib0', # InfiniBand
'job_script_prologue':job_prologue,
}
return cluster_config
if __name__ == '__main__':
cores_avail, mem, n_workers, n_jobs = 48, 190, 4, 1
job_prologue = ['export DASK_DISTRIBUTED__WORKER__RESOURCES__MEMORY=10']
print('Getting cluster config')
cluster_config = get_cluster_config(cores_avail, mem, n_workers, job_prologue)
print(cluster_config)
print()
print('Starting cluster')
with PBSCluster(**cluster_config) as cluster:
cluster.scale(n_jobs)
with Client(cluster) as client:
print('Client started...checking resources...\n')
print(f'Reported Worker Resources: {client.run(lambda dask_worker: dask_worker.total_resources)}\n')
print('Attempting to run resource constrained task...\n')
future = client.submit(sum, [1, 2, 3], resources={'MEMORY': 1})
print(future)
print(f'Total: {future.result()}')
When this sample code runs, it reports no worker resources, and the final task just hangs in pending status:
$ python test_resources.py
Getting cluster config
{'cores': 48, 'memory': '190GB', 'processes': 4, 'local_directory': '$WORKDIR/tmp', 'resource_spec': 'select=1:ncpus=48:mpiprocs=4', 'queue': 'debug', 'account': '', 'walltime': '00:05:59', 'interface': 'ib0', 'job_script_prologue': ['module load singularity', 'conda activate scr', 'export DASK_DISTRIBUTED__WORKER__RESOURCES__MEMORY=10']}
Starting cluster
Client started...checking resources...
Reported Worker Resources: {}
Attempting to run resource constrained task...
<Future: pending, key: sum-7d4ed656da9a03ac563e2ef09a9ea8fd>
Thanks for any help you can provide!