Some questions about SLURMCluster

Hi

I have some questions about using SLURMCluster:

  1. I’m trying to pass python files in the parent directory to the workers. I tried different ways including sys.path.append, setting PYTHONPATH in my .bashrc file, and setting PYTHONPATH in env_extra (it seems DASK version can’t recognize job_script_prologue), but they all fail. The script works only when the imported Python files are in the same directory (or a descendant) as the script. I just wonder if this is really the case (which means my code should be written to be called from the top directory), or I’m missing something here?
  2. In comparison between a single machine DASK distributed and SLURMCluster, I was able to launch 40 workers on a single machine cluster, but when I tried that with the SLURMCluster, I received Worker exceeded 95% memory budget. I wonder if there is a difference in memory allocation in both configurations?
  3. Also, I tried to use with performance_report with SLURMCluster to record some statistics in an HTML file, but it seems it does not work with dask_jobqueue, right?

Regards

Hi @shambakey1, welcome to this forum!

So 1.
Could you give us some example code you use for doing all that? Which dask-jobqueue version are you using? You should be able to have access to your modules if they are on a shared file system with the correct setting or PYTHONPATH for workers. There is some documentation about that here:
https://docs.dask.org/en/stable/how-to/manage-environments.html#single-machine-schedulers

Alternatively, you can also try the UploadFile or UploadDirectory plugins.

What could really help would be to have some reproducible example of that, I’ve seen several thread here or on Stackoverflow related to this, it seems it’s difficult to get right. So have some project hierarchy we want to import into workers, and a small code example.

What are you doing with LocalCluster, and what are you doing with SlurmCluster, could you give us some code snippet? There should not be any difference into memory configurations by default between the two Cluster Managers.

performance_report should work with any Distributed cluster, can you share some code example where it’s not working?

Hi

Thank you @guillaumeeb for your help, and very sorry for the late reply. I had to re-run some tests.

  1. The first issue was avoided by using the shared folder, as you suggested, on the Slurm cluster. However, my question was about adding python files in the parent directory of my current working directory, but it does not matter anymore, since using the shared folder resolves it.
  2. A snippet of the LocalCluster code:
client = Client(n_workers=nworkers, threads_per_worker=nthreads_per_worker)

with performance_report(filename="dask_report_map_"+anal_timing_id+"_"+str(time.time())+".html"):
    tmp_paw_computations=client.map(PlantAvailableWater_numpy_dask,poro_files[:max_num_requests],satur_files[:max_num_requests],[set_key_dims for i in range(max_num_requests)])
    
    final_paw_results=client.gather(tmp_paw_computations)

while the code snippet for SLURMcluster looks as follows (some options in the SLURMCluster() are old as I’m restricted to the available modules at my institution):

num_processes_per_job=10 # Setting number of processes per job to int(os.cpu_count()/2) leads to some workers exceeding %95 of memory. So, it may be better to reduce number of of processes pre job
num_cores_per_job=num_processes_per_job # Number of threads per job
num_gpus_per_job=4
num_jobs=2

# SOME OTHER STEPS

if __name__ == '__main__':
    
    # SOME OTHER STEPS
    
    cluster = SLURMCluster(
        cores=num_cores_per_job,
        processes=num_processes_per_job,
        memory="12GB",
        shebang="#!/usr/bin/env bash",
        queue=queue,
        scheduler_options={"dashboard_address": ":"+str(port)},
        walltime="01:00:00",
        local_directory=os.environ["SCRATCH_deepacf"],
        death_timeout="60m",
        log_directory=f'{os.environ["HOME"]}/dask_jobqueue_logs',
        interface="ib0",
        project=project,
        #job_script_prologue
        env_extra= ['source $HOME/jupyter/kernels/dg_rr_analytics/bin/activate'], # Loading special python environment before calculations. This parameter should be replaced by job_script_prologue in the future
        python="$HOME/jupyter/kernels/dg_rr_analytics/bin/python",
        job_extra=['--gres gpu:'+str(num_gpus_per_job)]
        )

    cluster.scale(jobs=num_jobs) # Without this step, no job has been launched (yet)
    client = Client(cluster)

    # SOME OTHER STEPS
    
    #with performance_report(filename="dask_jobqueue_report_map_"+anal_timing_id+"_"+str(time.time())+".html"):
    tmp_paw_computations=client.map(PlantAvailableWater_numba_cuda_dask_with_data_preparation,poro_files[0:max_num_requests],satur_files[0:max_num_requests],[set_key_dims for i in range(max_num_requests)],[os.path.join(libraries_path, "tmp.pfb") for i in range(max_num_requests)],[Alpha for i in range(max_num_requests)],[Nvg for i in range(max_num_requests)],[Sres for i in range(max_num_requests)],[nrw_paw_params for i in range(max_num_requests)],[(10,10) for i in range(max_num_requests)],[() for i in range(max_num_requests)],[paw_res_path_dask for i in range(max_num_requests)])

    final_paw_results=client.gather(tmp_paw_computations)

I face this memory problem when trying to get the same number of workers (40 workers) on SLURMCluster with 1 job, while launching 40 workers succeeds with the distributed cluster on a local machine.

  1. In the same SLURMCluster snippet above, if I un-commented the with performance_report statement (and of course, used proper indentation for the next steps), I receive the following error:
distributed.core - ERROR - Exception while handling op performance_report
Traceback (most recent call last):
  File "/p/software/juwels/stages/2022/software/dask/2021.9.1-gcccoremkl-11.2.0-2021.4.0/lib/python3.9/site-packages/distributed/core.py", line 502, in handle_comm
    result = await result
  File "/p/software/juwels/stages/2022/software/dask/2021.9.1-gcccoremkl-11.2.0-2021.4.0/lib/python3.9/site-packages/distributed/scheduler.py", line 7356, in performance_report
    task_stream = self.get_task_stream(start=start)
  File "/p/software/juwels/stages/2022/software/dask/2021.9.1-gcccoremkl-11.2.0-2021.4.0/lib/python3.9/site-packages/distributed/scheduler.py", line 7030, in get_task_stream
    return plugin.collect(start=start, stop=stop, count=count)
  File "/p/software/juwels/stages/2022/software/dask/2021.9.1-gcccoremkl-11.2.0-2021.4.0/lib/python3.9/site-packages/distributed/diagnostics/task_stream.py", line 59, in collect
    start = bisect(start, 0, len(self.buffer))
  File "/p/software/juwels/stages/2022/software/dask/2021.9.1-gcccoremkl-11.2.0-2021.4.0/lib/python3.9/site-packages/distributed/diagnostics/task_stream.py", line 48, in bisect
    startstop["stop"] for startstop in self.buffer[mid]["startstops"]
KeyError: 'startstops'
'startstops'
  File "/p/home/jusers/elshambakey1/juwels/mini_dg_rr/scripts/paw_benchmarking_dask_jobqueue_map.py", line 188, in <module>
    traceback.print_stack()

So, I’m using other ways like Python logging module (as suggested in the documents), and recording my own JSON files. But I just wonder if I’m doing something wrong, or if there is some way to enable performance_report with the SLURMCluster?

Regards

This should be doable with the appropriate configuration of environment variables like PYTHONPATH, but I know that this is easier said than done.

This seems quite low memory for the total job, is this normal? How much memory do you have on your Slurm servers?

How are your configuring LocalCluster?

Theoretically, performance_report should work well with any Distributed cluster. I see that the environment where you are running your main code might not be the same as the one you configure for your Workers? Any reason for this? What are your versions on Dask package on each of those environments?

Hi

Very sorry again for the late reply.

  1. For the memory part, yes it is quit a lot. Strange thing, when I set memory size to something around 10 GB, it starts complaining about memory spilling and worker exceeding %95 of memory budget or something like that. I don’t know why. I didn’t receive these problems when running the distributed scheduler on a local machine.
  2. The LocalCluster is implemented as a distributed dask scheduler running on a single machine with number of processes, and number of threads per process.
  3. For the performance_report when running dask.distributed integrated with SLURMCluster, I activate my Python environment first before running the main code. But as a precaution, I also activate the same python environment in the SLURMCluster configuration to be sure the workers are using the same Python environment.

Regards

Well, I was saying the opposite: 12GB for 10 processes is really low, this means 1.2 GB per process.

So you don’t give any memory limits here? This means that Dask will infer the settings, trying to use al the available memory on your machine. How much memory is available on your server?

You shouldn’t have to do this. But does your computation ends correctly, or do you have memory errors? If the computation doesn’t end properly, this might explain why performance_report is not working.