Hi Dask Community,
I’m encountering KilledWorker
errors when using adaptive scaling (cluster.adapt()
) with dask_jobqueue.SLURMCluster
(via a wrapper), but the same setup works fine when using fixed scaling (cluster.scale()
).
Setup:
- Cluster: SLURM cluster managed via
dask-jobqueue
. - Job Submission: Using a Python script with
dask.distributed.Client
. - Worker Environment: Workers are configured to activate a specific conda/virtual environment and set a
PYTHONPATH
using a custom shell script provided viajob_script_prologue
or similar mechanism (details below).
Problem:
When I define the cluster and then enable adaptive scaling, submitting simple tasks leads toKilledWorker
errors after some tasks complete successfully. The scheduler logs indicate workers died while trying to run tasks.
However, if I comment outcluster.adapt()
and instead usecluster.scale(jobs=N)
(orworkers=N*processes
) to request a fixed number of workers upfront, the exact same tasks run without anyKilledWorker
errors.
Code Example:
# Assuming 'ClusterManager.create_cluster' wraps SLURMCluster setup
# from my_utils import ClusterManager
from dask_jobqueue import SLURMCluster # More direct example
from dask.distributed import Client, as_completed
import time
import random
import os
# --- Worker Environment Setup Script (worker_setup.sh) ---
# This content is passed via job_script_prologue or similar
worker_setup_script = """
#!/bin/bash
echo "Worker starting on $(hostname)"
# Activate conda/virtual environment
source /path/to/my_shared_env/bin/activate
# Set custom PYTHONPATH if needed
export PYTHONPATH="/path/to/custom/code:/path/to/other/libs:$PYTHONPATH"
echo "Environment sourced, Python: $(which python)"
echo "PYTHONPATH: $PYTHONPATH"
"""
# --- SLURM Cluster Configuration ---
# These are the effective parameters passed to SLURMCluster
cluster_params = {
'queue': 'my_slurm_partition',
'cores': 5, # Cores per SLURM job
'memory': '25GB', # Memory per SLURM job
'processes': 3, # Dask workers per SLURM job (~8.3GB RAM, ~1 core/thread per worker)
'python': '/path/to/my_shared_env/bin/python',
'job_script_prologue': worker_setup_script.splitlines(),
# Add other relevant options like walltime, log_directory etc.
'walltime': '01:00:00',
'log_directory': 'dask_worker_logs', # Important for debugging!
'job_name': 'dask-adaptive-test',
}
print("Cluster Config:", cluster_params)
# --- Create Cluster and Client ---
cluster = SLURMCluster(**cluster_params)
client = Client(cluster)
print(f"Dask Dashboard Link: {client.dashboard_link}")
print("Using adaptive scaling...")
cluster.adapt(maximum_jobs=30)
# --- Simple Task Definition ---
def simple_task():
sleep_time = random.randint(5, 10)
time.sleep(sleep_time)
return 1
# --- Submit Tasks and Gather Results ---
N_TASKS = 100
print(f"Submitting {N_TASKS} simple tasks...")
futures = [client.submit(simple_task) for _ in range(N_TASKS)]
print("Waiting for results...")
for f in as_completed(futures):
print(f.result())
Error Output (When using cluster.adapt()
):
(Some initial tasks might succeed)
1
1
... (more successes) ...
(Then errors appear in the client script)
ERROR processing future <Future: status=error, key=simple_task-...>: KilledWorker("Attempted to run task 'simple_task-...' on 4 different workers, but all those workers died while trying to run it. The last worker that attempt to run the task was tcp://<worker-ip>:<port>. Inspecting worker logs is often a good next step...")
Shutting down...
Client and cluster closed.
Scheduler Logs show:
... - distributed.scheduler - ERROR - Task simple_task-... marked as failed because X workers died while trying to run it
Worker Logs (dask_worker_logs/dask-adaptive-test_*.err
or *.out
):
*slurmstepd-hsim-s49a12: error: *** JOB 58688261 ON hsim-s49a12 CANCELLED AT 2025-04-28T12:08:29 ***
slurmstepd-hsim-s74b18: error: *** JOB 60560569 ON hsim-s74b18 CANCELLED AT 2025-04-28T12:08:33 ***
slurmstepd-hsim-s74b18: error: *** JOB 60560651 ON hsim-s74b18 CANCELLED AT 2025-04-28T12:08:37 ***
slurmstepd-hsim-s73b18: error: *** JOB 60560685 ON hsim-s73b18 CANCELLED AT 2025-04-28T12:08:41 ***
slurmstepd-hsim-s69b3: error: *** JOB 60560182 ON hsim-s69b3 CANCELLED AT 2025-04-28T12:08:17 ***
slurmstepd-hsim-s45a20: error: *** JOB 58719358 ON hsim-s45a20 CANCELLED AT 2025-04-28T12:08:17 ***
slurmstepd-hsim-s62a8: error: *** JOB 58719524 ON hsim-s62a8 CANCELLED AT 2025-04-28T12:08:21 ***
slurmstepd-hsim-s58a15: error: *** JOB 58586616 ON hsim-s58a15 CANCELLED AT 2025-04-28T12:08:25 ****
Observations & Question:
- The
KilledWorker
error suggests the worker processes are terminating unexpectedly. - Given that fixed scaling (
cluster.scale()
) works, the issue seems related to the dynamics of adaptive scaling. Perhaps the rapid concurrent startup of many workers/jobs viaadapt()
triggers resource limits (memory spikes, network filesystem contention during environment setup, scheduler load?) that aren’t hit with a more gradual fixed scaling. - The memory per worker (
25GB / 3 processes ≈ 8.3GB
) might be tight, but it seems sufficient when workers are started viascale()
. - I also see
RuntimeWarning: coroutine 'AsyncDaskSLURMJob._close_job' was never awaited
warnings, but these often seem unrelated to the coreKilledWorker
problem.
Has anyone encountered similar issues whereadapt()
causes instability whilescale()
is fine? What specific aspects of adaptive scaling might trigger worker failures (like OOM kills) that wouldn’t happen with fixed scaling? Any tips on further debugging this?
Environment: - dask:
2025.3.0
- distributed:
2025.3.0
- dask-jobqueue:
0.9.0
- Python:
3.10.12
- OS: Linux
Thanks for any insights!