Hello!Monitoring the Dask dashboard, we could see the first tasks being completed.
The first 10 tasks wer
@jschueller and I are new users of SLURMCLuster, and we are trying to get the hang of it on a toy use case. Apologies in advance if I missed something obvious!
In our toy use-case, we have 20 tasks and each can be completed in about 25 seconds. We want to deploy them on 20 workers.
However, at the time we ran this use case, the cluster was almost saturated and SLURM only allocated 10 out of the 20 jobs that were requested at first.
Monitoring the Dask dashboard, we could see the first tasks being completed.
The first 10 tasks were then successfully completed. However, eventually the corresponding 10 running jobs ran into the walltime, and there only remained 10 pending jobs (and no running workers).
Looking back at the dashboard, the already completed tasks seemed suddenly forgotten! We could still see that work had been done, but all 20 tasks were once again listed as “queued”.
Then, finally, the remaining 10 pending jobs were allocated and the corresponding workers started doing tasks again, but not only those that remained. Previously completed tasks were redone from scratch!
My understanding is that the results of the completed tasks are stored by workers until all tasks are completed, and only then sent to the scheduler. This would explain why completed tasks are forgotten when all workers disappear due to the walltime. Is there a way to make workers send the results to the scheduler early?
Could you by any chance share your toy example? We at least need the API you are using.
If using Future for example, results are not sent to Scheduler. They are sent to client (possibly through the Scheduler) when requested through a gather call for example.
So I’m assuming you didn’t try to get these results in your script.
Thanks for your response @guillaumeeb, here is a minimal reproducing script (although to actually reproduce, you need the HPC cluster to be saturated):
import time
import math
from dask_jobqueue import SLURMCluster
from dask.distributed import Client, print
def fake_load(duration=30):
"""Simulate CPU load"""
start = time.time()
while time.time() - start < duration:
a = math.sqrt(64**5)
def toy_example(iteration):
fake_load(25) # Creates a fake load simulator for x sec.
return -iteration
# define the inputs
size = 20
inputs = list(range(size))
# define SLURMCluster object
cluster = SLURMCluster(
cores=1,
memory="512 MB",
walltime="00:00:40",
name="dask_worker",
job_extra_directives=[
"--wckey=P120K:SALOME",
"--output=logs/output.log",
"--error=logs/error.log",
],
interface="ib0",
)
print(f"> The dashboard link from the cluster : {cluster.dashboard_link}")
# submit to SLURM
cluster.scale(len(inputs)) # request one worker per task
# here, when the cluster is saturated, only 10 out of the 20 requested jobs are allocated by SLURM at first
# define the client and submit the tasks
client = Client(cluster)
futures = []
for x in inputs:
futures.append(client.submit(toy_example, x))
outputs = client.gather(
futures
) # if I understand correctly, 'client.gather' collects the results only after all tasks have been completed
# if the 10 originally available workers hit their walltime
# before the next 10 workers become available, all results are lost
print(outputs)
Here is the output of the squeue command on the cluster at 2 different times.
At first, when 10 out of the 20 requested jobs are allocated:
Afterwards, when the first 10 jobs have expired and the last 10 are still pending (this is when all results computed during the first 10 jobs are “forgotten”):
If I understand correctly, as_completed collects the results in the order with which they become available. We did try this with @jschueller but I think some tasks still ended up running twice (because with all original workers being down and the newer ones not up yet, the client probably still forgot which tasks had already been run and submitted them again to the new workers when they went up).
To confirm this, I have changed the script to use as_completed and to be more verbose:
import time
import math
from dask_jobqueue import SLURMCluster
from dask.distributed import Client, print, as_completed
def fake_load(duration=30):
"""Simulate CPU load"""
start = time.time()
while time.time() - start < duration:
a = math.sqrt(64**5)
def toy_example(iteration):
fake_load(55) # Creates a fake load simulator for x sec.
print(f"After fake_load in toy_example {iteration}")
return -iteration
# define the inputs
size = 20
inputs = list(range(size))
# define SLURMCluster object
cluster = SLURMCluster(
cores=1,
memory="512 MB",
walltime="00:01:00",
name="dask_worker",
job_extra_directives=[
"--wckey=P120K:SALOME",
"--output=logs/output.log",
"--error=logs/error.log",
],
interface="ib0",
)
print(f"> The dashboard link from the cluster : {cluster.dashboard_link}")
print(cluster.job_script())
# submit to SLURM
cluster.scale(len(inputs)) # request one worker per task
# here, when the cluster is saturated, only 10 out of the 20 requested jobs are allocated by SLURM at first
# define the client and submit the tasks
client = Client(cluster)
futures = client.map(toy_example, inputs)
order_completed = []
for future in as_completed(futures):
res = future.result()
print("result = ", res)
order_completed.append(res)
print("Outputs in the completion order = ", order_completed)
outputs = client.gather(futures)
print("Outputs in the correct order = ", outputs)
And here is the console output with the error produced:
> The dashboard link from the cluster : http://10.130.56.193:8787/status
#!/usr/bin/env bash
#SBATCH -J dask-worker
#SBATCH -n 1
#SBATCH --cpus-per-task=1
#SBATCH --mem=489M
#SBATCH -t 00:01:00
#SBATCH --wckey=P120K:SALOME
#SBATCH --output=logs/output.log
#SBATCH --error=logs/error.log
/scratch/users/C39575/python_forge/envs/hpc/bin/python -m distributed.cli.dask_worker tcp://10.130.56.193:44461 --name dummy-name --nthreads 1 --memory-limit 488.28MiB --nanny --death-timeout 60 --interface ib0
After fake_load in toy_example 0
result = 0
After fake_load in toy_example 2
result = -2
After fake_load in toy_example 4
After fake_load in toy_example 6
After fake_load in toy_example 8
After fake_load in toy_example 10
After fake_load in toy_example 14
After fake_load in toy_example 12
After fake_load in toy_example 16
result = -4
result = -6
result = -8
result = -10
result = -12
result = -16
result = -14
After fake_load in toy_example 0
After fake_load in toy_example 2
After fake_load in toy_example 4
After fake_load in toy_example 10
After fake_load in toy_example 6
After fake_load in toy_example 8
After fake_load in toy_example 12
After fake_load in toy_example 14
After fake_load in toy_example 16
2025-05-02 18:37:14,363 - distributed.scheduler - ERROR - Task toy_example-163de7e251191abc08ad7d988020ed90 marked as failed because 4 workers died while trying to run it
Traceback (most recent call last):
File "essai_debug.py", line 49, in <module>
res = future.result()
^^^^^^^^^^^^^^^
File "python_forge/envs/hpc/lib/python3.11/site-packages/distributed/client.py", line 402, in result
return self.client.sync(self._result, callback_timeout=timeout)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "python_forge/envs/hpc/lib/python3.11/site-packages/distributed/client.py", line 410, in _result
raise exc.with_traceback(tb)
distributed.scheduler.KilledWorker: Attempted to run task 'toy_example-163de7e251191abc08ad7d988020ed90' on 4 different workers, but all those workers died while running it. The last worker that attempt to run the task was tcp://10.130.51.125:43607. Inspecting worker logs is often a good next step to diagnose what went wrong. For more information see https://distributed.dask.org/en/stable/killed.html.
Clearly computations have run twice, despite the use of as_completed, and some computations did not manage to run at all. Here is what the dashboard looks like shortly before the error: