KubeCluster for Adaptive Dask

Requirement :- Using KubeCluster() we are trying to create dask cluster adaptively on the fly. This will be used by independent python scripts which does ETL.

I am running a python script in a kubernetes cluster which uses KubeCluster to adaptively scale dask cluster.

Referring this page- KubeCluster — Dask Kubernetes 2021.03.0+100.g4a69e3a documentation

But this is not working. Could you please share a reference document or guide on this?

Could you please share an example of what you have tried and the errors you are seeing?

from dask_kubernetes import KubeCluster, make_pod_spec

pod_spec = make_pod_spec(image=‘Package dask · GitHub’,
memory_limit=‘4G’, memory_request=‘4G’,
cpu_limit=1, cpu_request=1)

cluster = KubeCluster(pod_spec)

cluster.scale(10) # specify number of workers explicitly

image url is updated with harbor

Thanks for that. Ideally I would like to see the full traceback so I can see where the exception is coming from. But from what I can see in that screenshot there may be a proxy that is disrupting communication within the cluster.

Does your Kubernetes cluster use any network encryption tooling like Istio?

@jacobtomlinson Thanks for the reply.

I made slight change

from dask_kubernetes import KubeCluster, make_pod_spec
from dask.ditributed import Client

pod_spec = make_pod_spec(image=‘Package dask · GitHub’,
memory_limit=‘4G’, memory_request=‘4G’,
cpu_limit=1, cpu_request=1)

cluster = KubeCluster(pod_spec, namespace=“dask”, deploy_mode=“local”)


Till this it worled. But while calling the cluster

client = Client(cluster)

it is failing.

Pls let me know if any additional details required.

Please answer my question about your k8s network setup.

@jacobtomlinson we are not using istio

I can see some errors in your logs about application-proxy.blackrock.com which is making me think there may be some network communication issues within your cluster. The scheduler and workers assume they can communicate directly between their Pod IPs. Can you check that this communication works?

Thanks for the details. Proxy issue is resolved. But new error is coming now.

That kind of error happens when the scheduler and worker have different versions of distributed.

Can you ensure that the version in your environment matches the version in the container image you are using.

@jacobtomlinson dask doesn’t support python 3.7 ?

We follow NEP 29 and dropped support for Python 3.7 in 2022.2.0. But if you are not able to update to a more recent Python version you should be able to install dask and distributed 2022.1.1.

@jacobtomlinson thanks for the info.
One thing we notices, there no scheduler pod getting generated.

As per my understanding, while running the python script

  1. creates dask scheduler
  2. creates minimum worker pods
    Correct me if wrong.

And one more thing what will be dask scheduler capacity? (as per the code shared)

Setting deploy_mode="local" means there will not be a scheduler Pod, the scheduler gets created as a subprocess of your local Python process. If you change it back to deploy_mode="remote" then a scheduler Pod will be created (although it may have worker in the name but if you check the labels it is a scheduler component).

When you call cluster.scale(10) it should create 10 worker Pods.

I’m not sure what you mean by scheduler capacity.

By capacity I mean resource limit for dask scheduler.

@jacobtomlinson thank you so much for your help. This worked after making following changes

  • Fixed proxy issuse
  • deploy_mode updated to “remote”
  • Did required rolebinding to the service account

Thank again for all your support. Really appreciate that.

1 Like