Unfortunately, this solution does not appear to work as intended, which is part of the reason it took me so long to get back to the thread.
As I wrote in my initial message, the reason for me doing this is to provide some “elasticity”, i.e. allowing a job to use lots of resources (say dozens if not hundreds of nodes) without “hogging” them, by using a pre-emptable queue which other jobs would be able to “take back” (cancelling some of the dask worker jobs).
In my tests, I used just a handful of nodes, and on each I started working with one of the following
mpirun -np 56 dask-mpi --worker-class distributed.Worker --scheduler-file ./scheduler.json
mpirun -np 56 dask-mpi --worker-class distributed.Worker --scheduler-file scheduler.json --no-scheduler --name section2
mpirun -np 56 dask-mpi --worker-class distributed.Worker --scheduler-file scheduler.json --no-scheduler --name section3
My (test) dask program (running in a separate slurm job) was simply
def costly_simulation(list_param):
time.sleep(random.random()*40)
return list_param, socket.getfqdn()
from distributed import Client
client = Client(scheduler_file='./scheduler.json')
delayed_results=[]
for parameters in input_params.values:
delayed_results.append(client.submit(costly_simulation, parameters))
print(client.gather(delayed_results))
client.close()
This worked fine when the three worker jobs were running. I could even “scale” the worker jobs up or down to more nodes as long as that change happened while the Dask program was not running. That felt very promising!
However, if I cancelled any of the dask-mpi processes (killing the workers), it either totally ignore the results from those workers (very annoying to lose computation, but we could live with it) or, even worse, it failed the whole program with this error
distributed.scheduler.KilledWorker: Attempted to run task costly_simulation-1b1b4ae43171488678331ca9d6d1e3ce on 3 different workers, but all those workers died while running it.
and no, not all those workers died, only one of of them! Unfortunately, this is completely unacceptable for our use case (if it were acceptable, I’d simply run a single Dask job in the pre-emptable queue – which is very easy to configure, run and works like a charm… if not killed)
Moreover, if I tried to increase the number of workers while Dask was running, I could see the new workers registering and being available immediately in the logs of the first dask-mpi job, however the new workers did not participate to the computation for the running Dask job (they will join subsequent jobs). This is also annoying, and perhaps what @jacobtomlinson meant by
Any comment or suggestions?