Hi everyone, we are trying out the new Kubernetes Operator based KubeCluster for a workload that needs adaptive scaling. So far it is amazing, however, we have an issue with adaptive scaling which causes tasks to get lost. Here is how we provision our cluster and connect to it:
import distributed
from dask_kubernetes.operator import KubeCluster
cluster = KubeCluster(
name="my-dask-cluster",
image="ghcr.io/dask/dask:2022.12.0-py3.10",
n_workers=4,
)
cluster.adapt(4, 32)
client = distributed.Client(cluster)
which creates the following K8s objects as expected:
I run a simple Dask Array workflow to generate pseudo-random numbers and get a slice out to my client like this:
import dask.array as da
arr = da.random.normal(size=(4096, 16384, 4096), chunks=(128, 512, 512)).astype('float32')
arr_slice = arr[0].compute()
When we watch the dashboard it all looks fine until we start losing workers or tasks:
- Tasks start running on the 4 base workers.
- More workers spin up and get added to the cluster.
- As the workers keep coming, some of them start dying, the works get re-assigned.
- Eventually the workers either finish the job if we are lucky or we error out with this:
KilledWorker: Attempted to run task ('astype-normal-getitem-60d7c97bdab5f20901029ce4f6d7bc5c', 15, 5) on 3 different workers, but all those workers died while running it. The last worker that attempt to run the task was tcp://10.19.33.132:46341. Inspecting worker logs is often a good next step to diagnose what went wrong. For more information see https://distributed.dask.org/en/stable/killed.html.
We followed the link, but it doesn’t apply to any of the issues we are seeing. We can’t recover any information about why and how it dies.
Given this is such a trivial example, we are puzzled by why this is happening. We checked Dask K8s and dashboard logs and typically, it says “Removing worker …” followed by adding new workers etc.
In some cases, we get an exception thrown, saying the tasks weren’t able to be retrieved from the workers. Are we doing something wrong here, or is there a bug?
We have also tried the recreate_task_locally()
function, but it doesn’t return any information. It actually returns some values as a numpy array. Also confusing
A little more information about our setup:
- Google Cloud
- Google Kubernetes Engine
- We turned on Autopilot
- Running behind a VPC
- Using the default
ClusterIP
service or internalLoadBalancer
yields to same results. - If we don’t turn on adaptive scaling; scale by hand to many (hundreds) of pods, everything works fine.
- We can scale up or down manually without any issues.
- Operator, Dask, Distributed all at
2022.12.0
. - Everything on client, scheduler, and workers are on the same version.
- LocalCluster works fine too.
We think Autpilot may be the culprit here, but in theory, it shouldn’t conflict with anything Dask is doing. It just manages the node pools that Dask asks for etc. However, we are about to try a new deployment without autopilot to test if this is the culprit.
Thanks in advance!
Altay