Dask Adaptive Mode problems

Hello Dask Comunity!

I am facing the following problems, there are 2. We have out service which used Dask DataFrame for operations and we always executed some large dataset tests (1 milion and 2 million data). We always used static mode, 4 workers with spilling to disk, 2GB Pod memory limit and 1GB of Dask worker limit which was always fine. But now we are trying to use adaptive mode deployed in Kube cluster. Both of our Dask version and kube operator are 2025.4.1.

When we execute some really heavy CPU job and we use scaling 2 to 6 workers for example, a worker is dying because of CPU load.
This is the log from the worker:

[INFO] Remove worker addr: tcp://10.1.80.131:40681 name: lm-cluster-default-worker-cf2b1454b5 (stimulus_id=‘handle-worker-cleanup-1754048275.9446254’)
distributed.scheduler: scheduler.py:5512[ERROR] Task (‘chunk-10cd24fe0994cd4a8ff6cd05dfe6c480’, 31) marked as failed because 1 workers died while trying to run it

How i fixed the issue is by using 3 threads per worker and worker-ttl: 5m
Well for me it doesn’t make since to make the worker-ttl really high since it should always have a thread to send to the scheduler a heartbeat. If i remove it it starts crashing. I cannot say “Please reserve 1 thread to communicate with the scheduler only“. Do i miss something in the configuration?

Question 2:

Can please someone tell me what is going on, why this worker is being killed while still have some processing and memory? It seems that worker is trying to get data from another but its already dead

LOG FROM WORKER:

[ERROR] failed during get data with tcp://10.1.80.185:42683 → tcp://10.1.80.137:46005
<TCP (closed) local=tcp://10.1.80.185:42683 remote=tcp://10.1.80.137:56020>: ConnectionResetError: [Errno 104] Connection reset by peer
[INFO] Lost connection to ‘tcp://10.1.80.137:56020’

LOG FROM SCHEDULER:

[INFO] Worker status running → closing - <WorkerState ‘tcp://10.1.80.137:46005’, name: lm-cluster-default-worker-efa2a42e29, status: closing, memory: 59, processing: 3>
[INFO] Received ‘close-stream’ from tcp://10.1.80.137:40236; closing.
[INFO] Remove worker addr: tcp://10.1.80.137:46005 name: lm-cluster-default-worker-efa2a42e29 (stimulus_id=‘handle-worker-cleanup-1754401644.3424878’)
[ERROR] Task (‘add-b48bae854041d9f44435a6df4586833f’, 72) marked as failed because 1 workers died while trying to run it
[ERROR] Task (‘operation-1d6fd89bb12125bdea75ea6fd00a50cb’, 70) marked as failed because 1 workers died while trying to run it
[WARNING] Removing worker ‘tcp://10.1.80.137:46005’ caused the cluster to lose already computed task(s), which will be recomputed elsewhere: {(‘operation-011c3dbd3d6b2bbfb90c31f1ff44a05c’, 14), (‘operation-011c3dbd3d6b2bbfb90c31f1ff44a05c’, 20), …
[DEBUG] Removed worker <WorkerState ‘tcp://10.1.80.137:46005’, name: lm-cluster-default-worker-efa2a42e29, status: closed, memory: 0, processing: 0>

Thank you in advance

Hi @Teodor_Chakarov, welcome to Dask community!

First, and just to be sure, you don’t have any problem whe, you are using a static cluster configuration, so without autoscaling?

Could you also give your exact way/configuration when starting Dask cluster in the two modes? 2GB memory looks kind of small.

The only thought that triggers for me is a GIL holding problem that prevent Worker to communicate, but I agree this sounds weird, especially if you don’t run into this issue without auto scaling. It might be the autoscaling configuration that is wrong.

Again, if it only occurs when autoscaling is on, the cause might be a too aggressive down scaling or worker removal…

Hello and sorry for the late response!

First i will say that I dont face an issue when running a static mode (4 workers). In the dask-cm file I use: config.yml: |

    distributed:
      worker:
        preload: /app/openapi_server/init_dask_worker.py
        memory:
          spill-compression: auto
          rebalance:
            measure: managed
          target: 0.80     # fraction of managed memory where we start spilling to disk
          spill: 0.80      # fraction of process memory where we start spilling to disk
          pause: false      # fraction of process memory at which we pause worker threads
          terminate: false  # fraction of process memory at which we terminate the worker
      scheduler:
        preload: /app/openapi_server/init_dask_scheduler.py
        allowed-failures: 0
        worker-ttl: 10m
        active-memory-manager:
          interval: 2s
          measure: managed
        http:
          routes:
            - distributed.http.scheduler.prometheus
            - distributed.http.scheduler.api
            - distributed.http.health             
            - distributed.http.statics            
            - distributed.http.proxy
            - distributed.http.scheduler.json
            - distributed.http.scheduler.info
      nanny:
        pre-spawn-environ:
          malloc-trim-threshold-: 0
    adaptive:
      interval: 5s        
      target-duration: 5m
      wait-count: 36
    dataframe:
      dtype-backend: pyarrow
      convert-string: false
      shuffle:
        method: p2p

I use dask worker with 1 thread, mem limit 1GB and pod limit is 2 so it has time to start spilling. And this is the cluster config

clusterConfig:

#   # adaptive config

mode: 'adaptive'

adaptive:

name: lm-cluster

workers:

min: 4

max: 8

interval: 5s        # How often the scheduler checks whether to scale up or down.

target_duration: 10m # How many intervals a worker must be idle before it's removed.

wait_count: 36      # Desired time to complete tasks — controls how aggressively to scale up. For longer tasks adjust it higher.

Good to know is that sometimes the task runs succesfully but not always. Sometimes it just kills a worker which is not able to transfer the needed data. Here the target_duration: 10m and i am talking about a job which takes max 6mins, so if a worker is considered idle due to the high computation pressure still shouldnt be killed like that.

It looks like adaptive mechanism is killing workers with ongoing task or holding objects in memory not gracefully. This is a bit weir as it shouldn’t.

A thing I notice is that I’m not sure if you understood correctly the adaptive settings:

  • target_duration: is the delay in which you would your computation to finish. Dask will try to throw in as much worker as it can in order to achieve this goal.
  • wait_count: is the number of checks separated by interval delay before really deciding to retire a worker.

In your last settings, a Worker should be idle for 180s before being retired.

What happens if you leave default settings, is it worse?

The best thing forward would be to have some reproducible example, but I think this would be hard. Could you try to give more information about your workflow: how many tasks does it generates, how long take each task to execute, how long is the entire computation taking?

1 Like