Dask-operator lack of resiliency

Hello, after deploying the dask-operator and creating a sample dask cluster I realized that it creates simple pods, which makes it quite vulnerable to real life day-2 operations like k8s upgrades, node failures, etc…, for example:

  • If for any event a dask worker pod is killed the operator doesn’t recreate it
  • If for any event a dask scheduler pod is killed the operator doesn’t recreate it leaving orphaned dask worker pods wasting resources.

As I am familiar with kopf (framework used to develop the operator) I was wondering why you don’t use deployment specs instead of pod ones, with this simple change k8s control-plane will be able to recover the dask cluster from any eventual pod failure with little impact in current code.
Am I missing something?

Hi @davidp1404, welcome to this forum.

This discussion goes beyond my knowledge, but hopefully @jacobtomlinson will be able to give you some answer.

The DaskCluster resource is intended to be the equivalent to a Deployment but with more communication with the Dask scheduler.

If we used a Deployment and the Pod was lost the deployment would create an identical Pod which the scheduler would see as identical to the old one. However Dask workers are stateful while work is executing and so would not have the state the scheduler expects.

It would be better for the Dask Kubernetes Controller to notice that the expected scale size was not what was expected and to scale up to account for this. That way he scheduler would see the replacement Pod as a new worker with blank state.

We have an open issue tracking this work Worker group doesn't recover after pod deletion · Issue #603 · dask/dask-kubernetes · GitHub

1 Like

Hello @jacobtomlinson, I agree with your reasoning but when using Dask with orchestrators (like Prefect), in my view makes sense to recover the Dask cluster (=using deployments) and let the orcherstrator to manage the impact of failures.

I agree that it makes sense to recover the Dask Cluster. My point is that Deployment’s cannot do that correctly due to the semi-stateful nature of Dask Workers, so we’ve implemented our own controller to handle this.

Right now we have a DaskWorkerGroup resource which behaves similarly to a Deployment but with intelligent scale up/down, however it manages Pods directly. One thing we are considering is wrapping each worker Pod in a ReplicaSet so that the Kubernetes controller handles things like Pod recreation after eviction instead of us having to do it ourselves.

Hi @jacobtomlinson / @davidp1404 ,

We are having a similar issue with pods that are in a pending state or removed due to NetworkNotReady or similar issues and the scaling/reconciliation process falls apart without re-trying.

[2023-06-23 10:46:25,279] kopf.objects         [DEBUG   ] [ns/cluster-name-worker-b6355ac1c0] Something has changed, but we are not interested (the essence is the same).
[2023-06-23 10:46:25,279] kopf.objects         [DEBUG   ] [ns/cluster-name-worker-b6355ac1c0] Handling cycle is finished, waiting for new changes.
[2023-06-23 10:46:29,976] kopf.objects         [DEBUG   ] [ns/cluster-name-worker-b6355ac1c0] Deletion, but we are done with it, and we do not care.
[2023-06-23 10:46:29,976] kopf.objects         [DEBUG   ] [ns/cluster-name-worker-b6355ac1c0] Handling cycle is finished, waiting for new changes.
[2023-06-23 10:46:30,818] kopf.objects         [DEBUG   ] [ns/cluster-name-worker-b6355ac1c0] Deleted, really deleted, and we are notified.

We are using karpenter to scale the nodes so it can take a bit of time (but still within a minute or two). So if all nodes are not primed to place the pods, then Dask Operator just ignores the scale.

  • Above was a scenario where I was trying to scale from 25 to 35 and then to 45.

  • The DaskClusterGroup Spec clearly shows 45.

  • However, the cluster end up with 28 working pods. (example scenario)

Following is excerpt from the Workergroup manifest,

cluster: cluster-name
  worker:
    replicas: 45

Is there a timeout or re-check parameter that we can provide to the operator to actually reconcile this properly? Do you have any other suggestions to have this scaling performed reliably?

This behaviour is during the initialization even before any dask operations are performed.


We have to also mention that we do have an auto-termination and dashboard expose plugin for dask-operator but it only overrides the following kopf events,

@kopf.on.create("service", labels={"dask.org/component": "scheduler"})

@kopf.on.delete("service", labels={"dask.org/component": "scheduler"})

@kopf.on.field("pod", field="status.phase", new="Succeeded", labels={"dask.org/component": "worker"})

Thanks

Sorry you’re running into this. We recently merged some changes which will use deployments to ensure Pods get recreated in cases like this. I’m hoping to get a release out next week containing these fixes.

1 Like

This is awesome news Jacob. Thank you!!!

We are using Dask Operator in a Kubeflow environment and have a Dask wrapper to integrate with KubeCluster. Another thing we found recently is the inability of the following function to work consistently. I am not sure what is the problem, in case you can provide us with some hints.

client.wait_for_workers(n_workers, timeout)

This waits indefinitely on

info = await self.scheduler.identity()

The scheduler dashboard shows clearly that all workers are connected and well. No exceptions or closed connections on the logs either.

Thanks

Sorry for the delay in getting back here. The latest release is out so the improvements I mentioned should be available.

I’ve not seen the other problem you mention. Could you open an issue on GitHub with an explanation of how we can reproduce it so we can investigate?

Hi @jacobtomlinson

After the recent updates 2023.8.0, we have not seen this issue pop up yet.

We will keep posted if anything.

Thanks.

@jacobtomlinson

Is there a reason why 2023.8.0 started to create N deployment resources as opposed 1 deployment scaled to N replicas?

Isnt it better to leave the scheduling of the deployments or/and pods to K8S?

Could you please let us know the motivation behind the N deployment resources scheduled by the operator?

Thanks,
Jerry

Sure the main problem is that a Kubernetes Deployment assumes all Pods are stateless, but Dask workers are often holding state that we need to carefully manage. When we scale down we want the Dask scheduler to decide which workers to remove in an intelligent way and to migrate state to other workers before we remove it. We can’t do this with a deployment scaled to N workers.

Previously we were just creating Pods directly because of this, however, there are some benefits we get from having a Deployment scaled to 1 such as the Pod being recreated automatically if it is evicted or the node is removed.