Kubernetes Operator + AutoScaler Losing Tasks / Workers

Hi everyone, we are trying out the new Kubernetes Operator based KubeCluster for a workload that needs adaptive scaling. So far it is amazing, however, we have an issue with adaptive scaling which causes tasks to get lost. Here is how we provision our cluster and connect to it:

import distributed
from dask_kubernetes.operator import KubeCluster

cluster = KubeCluster(
    name="my-dask-cluster",
    image="ghcr.io/dask/dask:2022.12.0-py3.10",
    n_workers=4,
)

cluster.adapt(4, 32)
client = distributed.Client(cluster)

which creates the following K8s objects as expected:
image

I run a simple Dask Array workflow to generate pseudo-random numbers and get a slice out to my client like this:

import dask.array as da

arr = da.random.normal(size=(4096, 16384, 4096), chunks=(128, 512, 512)).astype('float32')
arr_slice = arr[0].compute()

When we watch the dashboard it all looks fine until we start losing workers or tasks:

  1. Tasks start running on the 4 base workers.
  2. More workers spin up and get added to the cluster.
  3. As the workers keep coming, some of them start dying, the works get re-assigned.
  4. Eventually the workers either finish the job if we are lucky or we error out with this:
KilledWorker: Attempted to run task ('astype-normal-getitem-60d7c97bdab5f20901029ce4f6d7bc5c', 15, 5) on 3 different workers, but all those workers died while running it. The last worker that attempt to run the task was tcp://10.19.33.132:46341. Inspecting worker logs is often a good next step to diagnose what went wrong. For more information see https://distributed.dask.org/en/stable/killed.html.

We followed the link, but it doesn’t apply to any of the issues we are seeing. We can’t recover any information about why and how it dies.

Given this is such a trivial example, we are puzzled by why this is happening. We checked Dask K8s and dashboard logs and typically, it says “Removing worker …” followed by adding new workers etc.

In some cases, we get an exception thrown, saying the tasks weren’t able to be retrieved from the workers. Are we doing something wrong here, or is there a bug?

We have also tried the recreate_task_locally() function, but it doesn’t return any information. It actually returns some values as a numpy array. Also confusing :slight_smile:

A little more information about our setup:

  • Google Cloud
  • Google Kubernetes Engine
  • We turned on Autopilot
  • Running behind a VPC
  • Using the default ClusterIP service or internal LoadBalancer yields to same results.
  • If we don’t turn on adaptive scaling; scale by hand to many (hundreds) of pods, everything works fine.
  • We can scale up or down manually without any issues.
  • Operator, Dask, Distributed all at 2022.12.0.
  • Everything on client, scheduler, and workers are on the same version.
  • LocalCluster works fine too.

We think Autpilot may be the culprit here, but in theory, it shouldn’t conflict with anything Dask is doing. It just manages the node pools that Dask asks for etc. However, we are about to try a new deployment without autopilot to test if this is the culprit.

Thanks in advance!
Altay

We can’t recover any information about why and how it dies.

Can you watch the scheduler and worker pod logs on your Kubernetes cluster? This will be the best place to figure out what is going on.

Hi @jacobtomlinson,

We have been trying, but hard to get to Worker logs after they have been terminated.

However, on the scheduler this is the error:

2022-12-08 17:56:15,183 - distributed.scheduler - ERROR - Shut down workers that don't have promised key: [], ('transpose-37d5309538e5918c286a45e5863fa247', 6, 23)

Sounds like Scheduler is killing them because the workers lose the keys somehow? These are only happening on newly added worker pods with the DaskAutoScaler (i.e. .adapt). If I manually scale without AutoScaler this is not an issue.

I will post more updates if we can get to the worker logs :slight_smile:

@jacobtomlinson

Another interesting thing is, the Classic KubeCluster works fine. The one with operator doesn’t.

We are still investigating.

I wonder if anyone in the community can reproduce this with the operator.

I can reproduce this with minikube on a local Kubernetes cluster just by using the demo from the documentation. I don’t think it is related to our GCP deployment.

I will open an issue at the repo.