Restarting workers on Slurm Cluster

Hi, I am trying to restart the workers on a Slurm cluster every time they are done with a task, as the compute times for one task can be rather long in my use case. I was trying to follow an approach proposed in https://github.com/dask/dask-jobqueue/issues/597:

for future in distributed.as_completed(client.map(
        do_one, list(range(100,132))
    )):
        who_has = client.who_has(future)
        closing = list(list(who_has.values())[0])
        client.retire_workers(closing)

However that did not seem to cancel the jobs in the slurm queue, meaning that jobs would eventually get killed by slurm, while a task is still running.

To simplify the problem I tried to use Client.restart(), but again the jobs weren’t cancelled in the slurm queue. I used the following script:

import time

from dask.distributed import Client
from dask_jobqueue import SLURMCluster


cluster = SLURMCluster(
        queue='normal',
        cores=1,
        memory='10GB',
        job_directives_skip=['#SBATCH --mem='],
)
cluster.adapt(minimum_jobs=1, maximum_jobs=1)
client = Client(cluster)

future = client.submit(lambda: "Dummy job")
print(future.result())

print('restarting...')
client.restart()

future = client.submit(lambda: "Dummy job")
print(future.result())

Logs:

2023-09-22 16:30:31,768 - distributed.nanny - INFO -         Start Nanny at: 'tcp://123.456.1.15:35509'
2023-09-22 16:30:32,336 - distributed.diskutils - INFO - Found stale lock file and directory '/tmp/dask-scratch-space-1004/worker-3kcjyjxs', purging
2023-09-22 16:30:32,860 - distributed.worker - INFO -       Start worker at:   tcp://123.456.1.15:35109
2023-09-22 16:30:32,860 - distributed.worker - INFO -          Listening to:   tcp://123.456.1.15:35109
2023-09-22 16:30:32,860 - distributed.worker - INFO -           Worker name:             SLURMCluster-0
2023-09-22 16:30:32,860 - distributed.worker - INFO -          dashboard at:         123.456.1.15:40399
2023-09-22 16:30:32,860 - distributed.worker - INFO - Waiting to connect to:  tcp://223.456.58.22:41265
2023-09-22 16:30:32,861 - distributed.worker - INFO - -------------------------------------------------
2023-09-22 16:30:32,861 - distributed.worker - INFO -               Threads:                          1
2023-09-22 16:30:32,861 - distributed.worker - INFO -                Memory:                   9.31 GiB
2023-09-22 16:30:32,861 - distributed.worker - INFO -       Local Directory: /tmp/dask-scratch-space-1004/worker-l2daiwki
2023-09-22 16:30:32,861 - distributed.worker - INFO - -------------------------------------------------
2023-09-22 16:30:32,867 - distributed.worker - INFO -         Registered to:  tcp://223.456.58.22:41265
2023-09-22 16:30:32,867 - distributed.worker - INFO - -------------------------------------------------
2023-09-22 16:30:32,868 - distributed.core - INFO - Starting established connection to tcp://223.456.58.22:41265
2023-09-22 16:30:32,904 - distributed.nanny - INFO - Nanny asking worker to close. Reason: scheduler-restart
2023-09-22 16:30:32,905 - distributed.worker - INFO - Stopping worker at tcp://123.456.1.15:35109. Reason: scheduler-restart
2023-09-22 16:30:32,905 - distributed.core - INFO - Connection to tcp://223.456.58.22:41265 has been closed.
2023-09-22 16:30:32,906 - distributed.nanny - INFO - Worker closed
2023-09-22 16:30:33,039 - distributed.nanny - WARNING - Restarting worker
2023-09-22 16:30:34,045 - distributed.worker - INFO -       Start worker at:   tcp://123.456.1.15:45387
2023-09-22 16:30:34,045 - distributed.worker - INFO -          Listening to:   tcp://123.456.1.15:45387
2023-09-22 16:30:34,045 - distributed.worker - INFO -           Worker name:             SLURMCluster-0
2023-09-22 16:30:34,045 - distributed.worker - INFO -          dashboard at:         123.456.1.15:41369
2023-09-22 16:30:34,045 - distributed.worker - INFO - Waiting to connect to:  tcp://223.456.58.22:41265
2023-09-22 16:30:34,045 - distributed.worker - INFO - -------------------------------------------------
2023-09-22 16:30:34,046 - distributed.worker - INFO -               Threads:                          1
2023-09-22 16:30:34,046 - distributed.worker - INFO -                Memory:                   9.31 GiB
2023-09-22 16:30:34,046 - distributed.worker - INFO -       Local Directory: /tmp/dask-scratch-space-1004/worker-v5wur63c
2023-09-22 16:30:34,046 - distributed.worker - INFO - -------------------------------------------------
2023-09-22 16:30:34,052 - distributed.worker - INFO -         Registered to:  tcp://223.456.58.22:41265
2023-09-22 16:30:34,052 - distributed.worker - INFO - -------------------------------------------------
2023-09-22 16:30:34,053 - distributed.core - INFO - Starting established connection to tcp://223.456.58.22:41265
slurmstepd: error: *** JOB 119484 ON node15 CANCELLED AT 2023-09-22T16:30:34 ***
2023-09-22 16:30:34,269 - distributed.worker - INFO - Stopping worker at tcp://123.456.1.15:45387. Reason: scheduler-close
2023-09-22 16:30:34,269 - distributed.core - INFO - Received 'close-stream' from tcp://223.456.58.22:41265; closing.
2023-09-22 16:30:34,271 - distributed.batched - INFO - Batched Comm Closed <TCP (closed) Worker->Scheduler local=tcp://123.456.1.15:40290 remote=tcp://223.456.58.22:41265>
Traceback (most recent call last):
  File "/home/di/anaconda/miniconda/envs/queens/lib/python3.10/site-packages/distributed/batched.py", line 115, in _background_send
    nbytes = yield coro
  File "/home/di/anaconda/miniconda/envs/queens/lib/python3.10/site-packages/tornado/gen.py", line 767, in run
    value = future.result()
  File "/home/di/anaconda/miniconda/envs/queens/lib/python3.10/site-packages/distributed/comm/tcp.py", line 269, in write
    raise CommClosedError()
distributed.comm.core.CommClosedError
2023-09-22 16:30:34,274 - distributed.nanny - INFO - Closing Nanny gracefully at 'tcp://123.456.1.15:35509'. Reason: scheduler-close

Looking at the logs, it seems like the worker process is restarted, but in my use case I would need the slurm job to be cancelled and a new job to be submitted through cluster.adapt(). How can I achieve this?

Hi @maxdinkel, welcome to Dask community!

I don’t think there is any solution for this currently. Dask was not conceived around the idea of one worker = one task.

Yes, client.restart won’t stop the job, it will just cause the Nanny to restart the Worker.

In your simple example, you’ll just need to use cluster.scale(0) and cluster.scale(1), but I don’t think this can be generalized to your real use case.

Ultimately, dask-jobqueue FooCluster classes just extend the SpecCluster, you could eventually play with the worker state of your SpecCluster instance to stop the Worker that was processing the class, but I’m not sure about it.

Do you Workers use only one process and one thread? Because this can make things even more complicated.

Hi @guillaumeeb, thank you for the quick response!

I think I might have found a solution for my use case, simply by running scancel $SLURM_JOB_ID for the corresponding jobs, and cluster.adapt seems to take care of submitting new jobs to the slurm queue:

results = {future.key: None for future in futures}
for future in as_completed(futures):
    worker = list(client.who_has(future).values())[0]
    results[future.key] = future.result()
    job_id = client.run(
        lambda: subprocess.check_output('echo $SLURM_JOB_ID', shell=True),
        workers=list(worker),
    )
    job_id = str(list(job_id.values())[0])[2:-3]
    client.run_on_scheduler(
        lambda: subprocess.run(f'scancel {job_id}', check=False, shell=True)
    )

Is there an easier way to figure out the $SLURM_JOB_ID?
And regarding your question: What difference does it make if the workers only use one process and one thread?

You can get it through cluster.workers, on a supercomputer I have access to:

cluster.workers['SLURMCluster-0'].job_id

It the workers use several threads or processes, you might have several tasks executing on the same worker at a given time. So if you kill this worker when one task is over, you might kill other tasks too.

Unfortunately I don’t have access to the cluster object, as it is running on a remote machine. Also client.scheduler_info()['workers'] does not seem to contain the job_id. This is not a big deal for me though and I am satisfied with the current workaround.
Thank you for your help!

1 Like