KilledWorker Errors with SLURMCluster `adapt()` but not `scale()`

Hi Dask Community,
I’m encountering KilledWorker errors when using adaptive scaling (cluster.adapt()) with dask_jobqueue.SLURMCluster (via a wrapper), but the same setup works fine when using fixed scaling (cluster.scale()).
Setup:

  • Cluster: SLURM cluster managed via dask-jobqueue.
  • Job Submission: Using a Python script with dask.distributed.Client.
  • Worker Environment: Workers are configured to activate a specific conda/virtual environment and set a PYTHONPATH using a custom shell script provided via job_script_prologue or similar mechanism (details below).
    Problem:
    When I define the cluster and then enable adaptive scaling, submitting simple tasks leads to KilledWorker errors after some tasks complete successfully. The scheduler logs indicate workers died while trying to run tasks.
    However, if I comment out cluster.adapt() and instead use cluster.scale(jobs=N) (or workers=N*processes) to request a fixed number of workers upfront, the exact same tasks run without any KilledWorker errors.
    Code Example:
# Assuming 'ClusterManager.create_cluster' wraps SLURMCluster setup
# from my_utils import ClusterManager
from dask_jobqueue import SLURMCluster # More direct example
from dask.distributed import Client, as_completed
import time
import random
import os
# --- Worker Environment Setup Script (worker_setup.sh) ---
# This content is passed via job_script_prologue or similar
worker_setup_script = """
#!/bin/bash
echo "Worker starting on $(hostname)"
# Activate conda/virtual environment
source /path/to/my_shared_env/bin/activate
# Set custom PYTHONPATH if needed
export PYTHONPATH="/path/to/custom/code:/path/to/other/libs:$PYTHONPATH"
echo "Environment sourced, Python: $(which python)"
echo "PYTHONPATH: $PYTHONPATH"
"""
# --- SLURM Cluster Configuration ---
# These are the effective parameters passed to SLURMCluster
cluster_params = {
    'queue': 'my_slurm_partition',
    'cores': 5,                     # Cores per SLURM job
    'memory': '25GB',               # Memory per SLURM job
    'processes': 3,                 # Dask workers per SLURM job (~8.3GB RAM, ~1 core/thread per worker)
    'python': '/path/to/my_shared_env/bin/python',
    'job_script_prologue': worker_setup_script.splitlines(),
    # Add other relevant options like walltime, log_directory etc.
    'walltime': '01:00:00',
    'log_directory': 'dask_worker_logs', # Important for debugging!
    'job_name': 'dask-adaptive-test',
}
print("Cluster Config:", cluster_params)
# --- Create Cluster and Client ---
cluster = SLURMCluster(**cluster_params)
client = Client(cluster)
print(f"Dask Dashboard Link: {client.dashboard_link}")
print("Using adaptive scaling...")
cluster.adapt(maximum_jobs=30)
# --- Simple Task Definition ---
def simple_task():
    sleep_time = random.randint(5, 10)
    time.sleep(sleep_time)
    return 1
# --- Submit Tasks and Gather Results ---
N_TASKS = 100
print(f"Submitting {N_TASKS} simple tasks...")
futures = [client.submit(simple_task) for _ in range(N_TASKS)]
print("Waiting for results...")
for f in as_completed(futures):
    print(f.result())

Error Output (When using cluster.adapt()):
(Some initial tasks might succeed)

1
1
... (more successes) ...

(Then errors appear in the client script)

ERROR processing future <Future: status=error, key=simple_task-...>: KilledWorker("Attempted to run task 'simple_task-...' on 4 different workers, but all those workers died while trying to run it. The last worker that attempt to run the task was tcp://<worker-ip>:<port>. Inspecting worker logs is often a good next step...")
Shutting down...
Client and cluster closed.

Scheduler Logs show:

... - distributed.scheduler - ERROR - Task simple_task-... marked as failed because X workers died while trying to run it

Worker Logs (dask_worker_logs/dask-adaptive-test_*.err or *.out):
*slurmstepd-hsim-s49a12: error: *** JOB 58688261 ON hsim-s49a12 CANCELLED AT 2025-04-28T12:08:29 ***
slurmstepd-hsim-s74b18: error: *** JOB 60560569 ON hsim-s74b18 CANCELLED AT 2025-04-28T12:08:33 ***
slurmstepd-hsim-s74b18: error: *** JOB 60560651 ON hsim-s74b18 CANCELLED AT 2025-04-28T12:08:37 ***
slurmstepd-hsim-s73b18: error: *** JOB 60560685 ON hsim-s73b18 CANCELLED AT 2025-04-28T12:08:41 ***
slurmstepd-hsim-s69b3: error: *** JOB 60560182 ON hsim-s69b3 CANCELLED AT 2025-04-28T12:08:17 ***
slurmstepd-hsim-s45a20: error: *** JOB 58719358 ON hsim-s45a20 CANCELLED AT 2025-04-28T12:08:17 ***
slurmstepd-hsim-s62a8: error: *** JOB 58719524 ON hsim-s62a8 CANCELLED AT 2025-04-28T12:08:21 ***
slurmstepd-hsim-s58a15: error: *** JOB 58586616 ON hsim-s58a15 CANCELLED AT 2025-04-28T12:08:25 ****
Observations & Question:

  • The KilledWorker error suggests the worker processes are terminating unexpectedly.
  • Given that fixed scaling (cluster.scale()) works, the issue seems related to the dynamics of adaptive scaling. Perhaps the rapid concurrent startup of many workers/jobs via adapt() triggers resource limits (memory spikes, network filesystem contention during environment setup, scheduler load?) that aren’t hit with a more gradual fixed scaling.
  • The memory per worker (25GB / 3 processes ≈ 8.3GB) might be tight, but it seems sufficient when workers are started via scale().
  • I also see RuntimeWarning: coroutine 'AsyncDaskSLURMJob._close_job' was never awaited warnings, but these often seem unrelated to the core KilledWorker problem.
    Has anyone encountered similar issues where adapt() causes instability while scale() is fine? What specific aspects of adaptive scaling might trigger worker failures (like OOM kills) that wouldn’t happen with fixed scaling? Any tips on further debugging this?
    Environment:
  • dask: 2025.3.0
  • distributed: 2025.3.0
  • dask-jobqueue: 0.9.0
  • Python: 3.10.12
  • OS: Linux
    Thanks for any insights!

Hi @Rama_Krishna_Reddy, welcome to Dask community!

Looking at your basic example, I’m not sure of what could be killing the jobs and making your workflow fail. I guess we probably need more logs if you canf ind them:

  • Scheduler/Client logs showing how adaptive behavior is scalling jobs up and down,
  • Worker logs to see what happen and the stack trace of a single task failure.

I don’t think this should be an issue: your toy example doesn’t need many resources, and standard scaling is also submitting a lot of jobs at the beginning.

Well, it doesn’t seem tight to me, especially considering your tasks.

There are issues with adaptive scaling, it’s hard to get right, but it should work fine with this simple example.