Create a (slurm) cluster with different job submission parameters

We submit jobs to Slurm as described here Example Deployments — Dask-jobqueue 0.8.2+2.gef45c0e.dirty documentation and it works fine in our environment.

However we would like to balance the need of using more computing resources (which we have) while playing nice with other users. A good way to do that would be to submit to a pre-emptable queue, so our gazillion dask jobs will run only as long as there isn’t anything more urgent pending. That also works okay, however it has the drawback that sometimes all the gazillion jobs are pre-empted and there is no progress anymore.

It would be fantastic if we can submit a dask cluster with some job in a queue (non pre-emptable), guaranteeing progress, albeit at a slow pace – and at the same time, as part of the same dask cluster, having many more jobs in the preemptable queue (which is otherwise identical to the regular queue) to take advantage of additional computing resources.

Is this at all possible? From the documentation it appears it’s not, but I’d think that should be a common use case so perhaps we’re missing something? We can have slurm configured differently if that helps.

I’ve seent his done with dask-mpi but not with dask-jobqueue.

With Dask MPI you can submit two jobs, one with a scheduler and one without (workers connect to the first scheduler). That way you can choose different queues. However Dask MPI doesn’t support things like scaling so if that’s important to you then that might not be a good route for you.

I’ve already written this answer today for another subject. The idea is as @jacobtomlinson says: using the same Scheduler within two dask-jobqueue clusters.

There is a long standing issue to implement this in dask-jobqueue.

Any help appreciated!

If you can modify the Slurm configuration, you might be able to achieve your goal like this: there is probably a way with Slurm to configure how many jobs might be preempted for a single user on a given queue or qos.

Thanks to both and sorry for the delay in following up.

Can @guillaumeeb or @guillaumeeb elaborate a bit on how you would achieve that with Dask MPI? My understanding is that when using the Dask MPI API all of Dask is running on a single (potentially multi-node) jobs and as such if/when Slurm (or other non-Dask scheduler, the reuse of terminology makes talking about his so confusing) pre-empts the job, it’s all gone! So my understanding is that this would make matters worse, but I am probably missing something and would appreciate elucidation.

If you can modify the Slurm configuration, you might be able to achieve your goal like this: there is probably a way with Slurm to configure how many jobs might be preempted for a single user on a given queue or qos.

There are some ways, but they are all very unsatisfying for one reason or another. Having the users to configure the number of “mandatory” and pre-emptable jobs withing Dask itself would be ideal.

With dask-mpi you can submit multiple jobs, you just specify --no-scheduler on the subsequent jobs and the workers will connect the the scheduler created with the first job.

https://mpi.dask.org/en/latest/interactive.html

So for your use case I expect you would submit one job that creates the scheduler and workers and do that on a queue that won’t get preempted, then submit one or more additional jobs to the preemptible queue.

1 Like

That sounds like a good solution, even if the submission is a bit more complicated. I’ll give it a try later today and if it works as I am understanding it may be all that we need – if so I’ll close the thread.

Thanks

1 Like

Unfortunately I can’t get dask-mpi to work at all. I tried various things but everything in Dask fails as below, even though other MPI jobs (including mpi4py ones) work just fine:

$ srun --pty --exclusive -n 10 bash
$ cat test_mpi.py
from mpi4py import MPI

comm = MPI.COMM_WORLD
rank = comm.Get_rank()
print(rank)

$ mpirun -n 10 python test_mpi.py
8
9
0
1
2
3
4
5
6
7

$ mpirun -n 10 dask-mpi --scheduler-file scheduler.json
2023-12-20 14:00:05,215 - distributed.http.proxy - INFO - To route to workers diagnostics web server please install jupyter-server-proxy: python -m pip install jupyter-server-proxy
2023-12-20 14:00:05,218 - distributed.scheduler - INFO - State start
2023-12-20 14:00:05,221 - distributed.scheduler - INFO -   Scheduler at:   tcp://192.168.2.7:42205
2023-12-20 14:00:05,221 - distributed.scheduler - INFO -   dashboard at:                     :8787
2023-12-20 14:00:05,230 - distributed.nanny - INFO -         Start Nanny at: 'tcp://192.168.2.7:44879'
2023-12-20 14:00:05,231 - distributed.nanny - INFO -         Start Nanny at: 'tcp://192.168.2.7:45675'
2023-12-20 14:00:05,231 - distributed.nanny - INFO -         Start Nanny at: 'tcp://192.168.2.7:38221'
2023-12-20 14:00:05,231 - distributed.nanny - INFO -         Start Nanny at: 'tcp://192.168.2.7:36765'
2023-12-20 14:00:05,231 - distributed.nanny - INFO -         Start Nanny at: 'tcp://192.168.2.7:46415'
2023-12-20 14:00:05,231 - distributed.nanny - INFO -         Start Nanny at: 'tcp://192.168.2.7:45485'
2023-12-20 14:00:05,231 - distributed.nanny - INFO -         Start Nanny at: 'tcp://192.168.2.7:39515'
2023-12-20 14:00:05,231 - distributed.nanny - INFO -         Start Nanny at: 'tcp://192.168.2.7:41833'
2023-12-20 14:00:05,232 - distributed.nanny - INFO -         Start Nanny at: 'tcp://192.168.2.7:33407'
--------------------------------------------------------------------------
It looks like orte_init failed for some reason; your parallel process is
likely to abort.  There are many reasons that a parallel process can
fail during orte_init; some of which are due to configuration or
environment problems.  This failure appears to be an internal failure;
here's some additional information (which may only be relevant to an
Open MPI developer):

  getting local rank failed
  --> Returned value No permission (-17) instead of ORTE_SUCCESS
--------------------------------------------------------------------------
--------------------------------------------------------------------------
It looks like orte_init failed for some reason; your parallel process is
likely to abort.  There are many reasons that a parallel process can
fail during orte_init; some of which are due to configuration or
environment problems.  This failure appears to be an internal failure;
here's some additional information (which may only be relevant to an
Open MPI developer):

  orte_ess_init failed
  --> Returned value No permission (-17) instead of ORTE_SUCCESS
--------------------------------------------------------------------------
--------------------------------------------------------------------------
It looks like MPI_INIT failed for some reason; your parallel process is
likely to abort.  There are many reasons that a parallel process can
fail during MPI_INIT; some of which are due to configuration or environment
problems.  This failure appears to be an internal failure; here's some
additional information (which may only be relevant to an Open MPI
developer):

  ompi_mpi_init: ompi_rte_init failed
  --> Returned "No permission" (-17) instead of "Success" (0)
--------------------------------------------------------------------------
*** An error occurred in MPI_Init_thread
*** on a NULL communicator
*** MPI_ERRORS_ARE_FATAL (processes in this communicator will now abort,

Halving the number of mpi processes to cover the warning in the documentation does not help either

$ mpirun -n 5 dask-mpi --scheduler-file scheduler.json
2023-12-20 14:03:16,286 - distributed.http.proxy - INFO - To route to workers diagnostics web server please install jupyter-server-proxy: python -m pip install jupyter-server-proxy
2023-12-20 14:03:16,289 - distributed.scheduler - INFO - State start
2023-12-20 14:03:16,292 - distributed.scheduler - INFO -   Scheduler at:   tcp://192.168.2.7:34189
2023-12-20 14:03:16,292 - distributed.scheduler - INFO -   dashboard at:                     :8787
2023-12-20 14:03:16,302 - distributed.nanny - INFO -         Start Nanny at: 'tcp://192.168.2.7:40899'
2023-12-20 14:03:16,302 - distributed.nanny - INFO -         Start Nanny at: 'tcp://192.168.2.7:41659'
2023-12-20 14:03:16,302 - distributed.nanny - INFO -         Start Nanny at: 'tcp://192.168.2.7:34465'
2023-12-20 14:03:16,302 - distributed.nanny - INFO -         Start Nanny at: 'tcp://192.168.2.7:43743'
--------------------------------------------------------------------------
It looks like orte_init failed for some reason; your parallel process is
likely to abort.  There are many reasons that a parallel process can
fail during orte_init; some of which are due to configuration or
environment problems.  This failure appears to be an internal failure;
here's some additional information (which may only be relevant to an
Open MPI developer):

  getting local rank failed
  --> Returned value No permission (-17) instead of ORTE_SUCCESS
--------------------------------------------------------------------------
--------------------------------------------------------------------------
It looks like orte_init failed for some reason; your parallel process is
likely to abort.  There are many reasons that a parallel process can
fail during orte_init; some of which are due to configuration or
environment problems.  This failure appears to be an internal failure;
here's some additional information (which may only be relevant to an
Open MPI developer):

  orte_ess_init failed
  --> Returned value No permission (-17) instead of ORTE_SUCCESS
--------------------------------------------------------------------------
--------------------------------------------------------------------------
It looks like MPI_INIT failed for some reason; your parallel process is
likely to abort.  There are many reasons that a parallel process can
fail during MPI_INIT; some of which are due to configuration or environment
problems.  This failure appears to be an internal failure; here's some
additional information (which may only be relevant to an Open MPI
developer):

  ompi_mpi_init: ompi_rte_init failed
  --> Returned "No permission" (-17) instead of "Success" (0)
--------------------------------------------------------------------------
*** An error occurred in MPI_Init_thread
*** on a NULL communicator
*** MPI_ERRORS_ARE_FATAL (processes in this communicator will now abort,
***    and potentially your MPI job)
[node07.cluster:2840214] Local abort before MPI_INIT completed completed successfully, but am not able to aggregate error messages, and not able to guarantee that all other processes were killed!
2023-12-20 14:03:16,981 - distributed.nanny - INFO - Worker process 2840214 exited with status 1
2023-12-20 14:03:16,982 - distributed.nanny - WARNING - Restarting worker
*** An error occurred in MPI_Init_thread
*** on a NULL communicator
*** MPI_ERRORS_ARE_FATAL (processes in this communicator will now abort,
***    and potentially your MPI job)

Hi @davide, could you try using the --no-nanny option?

1 Like

Thanks @guillaumeeb the no-nanny (or better the --worker-class distributed.Worker) seems to have fixed the issue and I’ve been able to correctly run a hello world test with it.

Strange that it did not accept my mpirun with half of the scheduled MPI slots (the 5 example above), but oh well.

I will try more complex workflows soon and report back here, but (I don’t want to speak too soon) this looks like a very viable solution for our use case!

Thanks and happy holidays!

For the explanation: it’s not rare that MPI libraries forbid the forking or spawning of new processes, which leads to incompatibility with how Nanny is working.

1 Like

Unfortunately, this solution does not appear to work as intended, which is part of the reason it took me so long to get back to the thread.

As I wrote in my initial message, the reason for me doing this is to provide some “elasticity”, i.e. allowing a job to use lots of resources (say dozens if not hundreds of nodes) without “hogging” them, by using a pre-emptable queue which other jobs would be able to “take back” (cancelling some of the dask worker jobs).

In my tests, I used just a handful of nodes, and on each I started working with one of the following

mpirun -np 56 dask-mpi --worker-class distributed.Worker --scheduler-file ./scheduler.json

mpirun -np 56 dask-mpi --worker-class distributed.Worker --scheduler-file scheduler.json --no-scheduler --name section2

mpirun -np 56 dask-mpi --worker-class distributed.Worker --scheduler-file scheduler.json --no-scheduler --name section3

My (test) dask program (running in a separate slurm job) was simply

def costly_simulation(list_param):
    time.sleep(random.random()*40)
    return list_param, socket.getfqdn()

from distributed import Client
client = Client(scheduler_file='./scheduler.json')

delayed_results=[]
for parameters in input_params.values:
    delayed_results.append(client.submit(costly_simulation, parameters))

print(client.gather(delayed_results))
client.close()

This worked fine when the three worker jobs were running. I could even “scale” the worker jobs up or down to more nodes as long as that change happened while the Dask program was not running. That felt very promising!

However, if I cancelled any of the dask-mpi processes (killing the workers), it either totally ignore the results from those workers (very annoying to lose computation, but we could live with it) or, even worse, it failed the whole program with this error

distributed.scheduler.KilledWorker: Attempted to run task costly_simulation-1b1b4ae43171488678331ca9d6d1e3ce on 3 different workers, but all those workers died while running it. 

and no, not all those workers died, only one of of them! Unfortunately, this is completely unacceptable for our use case (if it were acceptable, I’d simply run a single Dask job in the pre-emptable queue – which is very easy to configure, run and works like a charm… if not killed)

Moreover, if I tried to increase the number of workers while Dask was running, I could see the new workers registering and being available immediately in the logs of the first dask-mpi job, however the new workers did not participate to the computation for the running Dask job (they will join subsequent jobs). This is also annoying, and perhaps what @jacobtomlinson meant by

Any comment or suggestions?

I think this is because results are kept in Worker memory until all of thos are completed since you try to gather them all. Since you cancel the Worker jobs, Workers are probably not ending gracefully, and results have no time to being transferred on a still running Worker. You might want to try as_completed to prevent this.

This is more complicated. Why are you saying only one Worker died, you launch several Workers per job, don’t you? It could be a race condition, task being Scheduled to 2 other workers that are also being killed with the job.

One thing you could try is to increase the number of retry per task.

This is really changed, I’ve always seen tasks beeing affected to new workers made available to the Scheduler, even on a running computation.

I understand my comments are not very helpful… Dynamic scaling is complex.

Thanks a lot for getting back to me so quickly. I appreciate it.

I see. This makes sense and it (obviously) happens in the non-as_completed runs using the SLURMCluster too (see below)

You are right I was being sloppy and meant all Workers running within a node where the relevant job was cancelled.

Sure, I see that. However it’s already robustly implemented in Dask! So one does not need to re-invent the wheel…

Evidence: if I use the SLURMCluster, that works already perfectly. Jobs can start when they want (can) and can be cancelled anytime without any global implications at all: Dask automagically has already all the logic implemented to use any worker currently up and running (and as you said if using as_comleted even partially computed stuff gets gathered, which I assume works for the MPI version too, but I have tried only the Slurm, not the MPI version). Everything works very nicely. The only problem is that the job scripts ought to be the same among all jobs. To be more clear, if I run the following:

 def costly_simulation(list_param):
    time.sleep(random.random()*40)
    return list_param, socket.getfqdn()

cluster = SLURMCluster(
        cores=56,
        queue="regular")
cluster.scale(jobs=10)
from distributed import Client
client = Client(cluster)

delayed_results=[]
for parameters in input_params.values:
    delayed_results.append(client.submit(costly_simulation, parameters))

print(client.gather(delayed_results))
client.close()

the ten jobs from the scale() of the SLURMCluster can start anytime during the duration of the job (perhaps because initially held back by the Slurm scheduler because of priority or resources) and can be likewise be cancelled anytime (in this experiment I just cancelled them manually mid-run) and I always get a result and the result comes even from the jobs that started toward the end of the run.

So it seems to me that (at least for my use case) it would be easier to change the Cluster behavior to allow multiple batch scripts, rather than trying to backport robustness in the (admittedly more complicated) MPI logic? Have you seen what I proposed in the github issue you mentioned earlier and if so what do you think about it?

Thanks again

Yes probably! let’s continue the discussion there, a PR would be very welcome, this is a long awaited feature in dask-jobqueue.

I will be happy to provide a PR, but there are a few things that I would need to improve. Most notably, I am calling scale twice, changing the queue (or any other parameter) in between with a change of cluster._job_kwargs. As is, in most cases this won’t work because the second call it’s too close to the first and as such the actual number of jobs in each queue could be different depending on sbatch execution speed.

As a quick fix, I’m including a sleep in between, however that only mitigates, not solves, the race condition. If for example the slurm database or the HPC filesystem is experiencing a hang (a not-uncommon situation in some production environment) what happens depends on the length of the sleep compared to the length of the hang.

A proper fix would be to actually wait for the first jobs to be submitted, which, if my understanding is correct, happens asynchronously because of distributed/distributed/deploy/spec.py at main · dask/distributed · GitHub

I tried the following

async def wait_for_jobs_to_be_submitted(cluster):
    await asyncio.wait([asyncio.Future(cluster._correct_state)])

cluster = SLURMCluster(
        cores=1,
        job_extra_directives=['--exclusive',
            '--output=test-%j.log',
            '-J test',
            ],
        memory='255GB',
        queue="regular",
        job_directives_skip=['-t', '--cpus-per-task'],
        interface='ib0')
cluster.scale(jobs=10)

cluster._job_kwargs['queue'] = 'preempt'
asyncio.run(wait_for_jobs_to_be_submitted(cluster))
cluster.scale_up(100)

But that fails with TypeError: An asyncio.Future, a coroutine or an awaitable is required which is puzzling since the cluster._correct_state’s mro is <class '_asyncio.Task'>, <class '_asyncio.Future'>, <class 'object'>. I am not familiar enough with asyncio to understand why this is happening, so I’d need to dig deeper.

To properly solve this issue I think we’d need to have a PR in distributed too, not just in dask-jobqueue, and I think this would complicate the matters. Do you have experience coordinating between these two projects?

Of course it is possible to propose changes in distributed that would benefit other downstream packages.

But that might also mean this change is not appropriate and we should need to think on it further. Unfortunately I don’t really have the time right now.