Running Dask on AKS

Hello :wave:
@jacobtomlinson I moved this question from the Slack to here.
I am new with Dask, and I struggle to understand how to use Dask with the cloud.
Our app is a webserver in a docker container, and we want to compute long process with Dask on AKS.
I already created Azure Kubernetes Service cluster and installed Dask using the helm chart 2024.1.1.
now my problem is how to configure my code to use this cluster.

I tried the following code, but it doesnโ€™t work I wondered if someone could give me a snippet with step by step how to make it work, because Iโ€™m lost so far and didnโ€™t find anything like this in the docs or forum.

from dask.distributed import Client
from dask_kubernetes.classic import KubeCluster, make_pod_spec
from dask_kubernetes import KubeConfig
from dask.array import random
import os

pod_spec = make_pod_spec(image='ghcr.io/dask/dask:latest',
                         memory_limit='4G', memory_request='4G',
                         cpu_limit=1, cpu_request=1)

def source():
    absolute_path = os.path.dirname(__file__)
    relative_path = "../../kubeconfig.yaml"
    full_path = os.path.join(absolute_path, relative_path)
    if os.path.isfile(full_path):
        auth = KubeConfig(config_file=full_path)
        cluster = KubeCluster(pod_spec, auth=auth)
        cluster.scale(10)
        client = Client(cluster)
        x = random.random((1000, 1000), chunks=(100, 100))
        result = x.sum().compute()

        cluster.scale(0)
        client.close()

Thanks for raising this. The first thing I would say is youโ€™re using the classic kubecluster which is deprecated. Please try again with the modern version.

https://kubernetes.dask.org/en/latest/operator_kubecluster.html

but I still donโ€™t understand how the connection to the AKS is working.
Thanks.

the new operator has no โ€œauthโ€ so how I use my โ€œkubeconfigโ€ with it so it can connect to the AKS?

Ok so i managed to do it with Gateway and i was able to connect to the cluster.
However when i run the command to grow cluster it throws the following error.
i saw the ticket is closed on GitHub so i wonder maybe I miss something, like do i need to install two helm charts ? one for dask and one for gateway ?

error:
RuntimeError: Accessing the loop property while the loop is not running is not supported

def dask_operation():
    logger.info("connection to dask AKS")
    gateway = Gateway("http://IP")
    cluster = gateway.new_cluster()
    logger.info("scaling to 10")
    cluster.scale(10)

With the operator you just configure it like you would configure kubectl. If your config is in the default location it will pick it up, if not you set it with the KUBE_CONFIG environment variable.

Dask Gateway is a separate project that doesnโ€™t currently have any active maintainers. If youโ€™re having problems there I recommend opening an issue on the repo and perhaps someone from the community can help you out.

Hi @ohad, welcome to Dask Discourse,

Please try to explain your problem in detail wether it is here or on Dask Gateway github issue tracker. How did you deploy the Dask Gateway helm chart (which should work on its own), and the complete stack trace you are getting!

Hey i followed the docs on the Dask website for deploying on Kubernetes.
i used the following example like in the docs:

helm install --repo https://helm.dask.org --create-namespace -n dask-gateway --generate-name dask-gateway
from dask_gateway import Gateway
gateway = Gateway("<gateway service address>")
cluster = gateway.new_cluster()

@jacobtomlinson if I use the operator what do I need to install on the Kubernetes cluster? just the helm chart of the operator like in the docs, also if I run the app in a docker how you will suggest for me to keep the kubeconfig (it canโ€™t be in the original spot), I need a portable solution.

Then as @jacobtomlinson says, if you used the basic configuration and are seeing errors, please open a github issue. Could you also explain from where are you running the dask_operation() function?

i still i think that i do something wrong. on the AKS I only deployed the gateway. is it enough? like the rest of the cluster will be created automatically or I need to deploy something else also?

The rest of the cluster should be deployed by the Gateway upon request.

So, I was doing the following example and here is the error with full stack.
on the AKS I only installed the helm for the gate-way.
and running the following code:

    logger.info("connection to dask AKS")
    gateway = Gateway(address="http://IP")
    gateway.list_clusters()
    cluster = await gateway.new_cluster()
    logger.info("created cluster")
    cluster.close()
    logger.info("closed cluster")
INFO:     connection to dask AKS
INFO:     created cluster
INFO:     closed cluster
Exception ignored in: <function Gateway.__del__ at 0x7ff63e4c8940>
Traceback (most recent call last):
  File "/home/ohad/.cache/pypoetry/virtualenvs/webserver-PREBSnLK-py3.10/lib/python3.10/site-packages/dask_gateway/client.py", line 380, in __del__
    self.close()
  File "/home/ohad/.cache/pypoetry/virtualenvs/webserver-PREBSnLK-py3.10/lib/python3.10/site-packages/dask_gateway/client.py", line 353, in close
    elif self.loop.asyncio_loop.is_running():
  File "/home/ohad/.cache/pypoetry/virtualenvs/webserver-PREBSnLK-py3.10/lib/python3.10/site-packages/dask_gateway/client.py", line 330, in loop
    return self._loop_runner.loop
  File "/home/ohad/.cache/pypoetry/virtualenvs/webserver-PREBSnLK-py3.10/lib/python3.10/site-packages/distributed/utils.py", line 660, in loop
    raise RuntimeError(
RuntimeError: Accessing the loop property while the loop is not running is not supported

Are you sure you need to use an await for the new cluster creation?

@jacobtomlinson
I have used the Dask Operator on my AKS and connected to it with the โ€œkubeconfigโ€. but there is an error in the following example and the code stuck and nothing happens.
what could be the issue?

from dask_kubernetes.operator import KubeCluster
import dask.array as da

def dask_operation():
    logger.info("connection to dask")
    cluster = KubeCluster()
    client = cluster.get_client()
    logger.info(cluster.dashboard_link)
    logger.info("created cluster")

    # Perform heavy computation using the Dask client
    start = time.time()
    for i in range(1000):
        result = my_heavy_computation_function()
    runtime = time.time() - start
    logger.info(runtime)

    # Close the cluster and client
    client.close()
    cluster.close()
    logger.info("closed cluster")

def my_heavy_computation_function():
    # Simulate heavy computation by performing a large matrix multiplication
    matrix_size = 1000
    matrix_a = da.random.random(matrix_size, matrix_size)
    matrix_b = da.random.random(matrix_size, matrix_size)
    result = da.routines.dot(matrix_a, matrix_b)
    return result.compute()
INFO:     connection to dask
Task exception was never retrieved
future: <Task finished name='Task-14' coro=<PortForward._sync_sockets() done, defined at 
/home/ohad/.cache/pypoetry/virtualenvs/webserver-PREBSnLK-py3.10/lib/python3.10/site-packages/kr8s/_portforward.py:167> exception=ExceptionGroup('unhandled errors in a 
TaskGroup', [ConnectionClosedError('TCP socket closed')])>
  + Exception Group Traceback (most recent call last):
  |   File "/home/ohad/.cache/pypoetry/virtualenvs/webserver-PREBSnLK-py3.10/lib/python3.10/site-packages/kr8s/_portforward.py", line 171, in _sync_sockets
  |     async with anyio.create_task_group() as tg:
  |   File "/home/ohad/.cache/pypoetry/virtualenvs/webserver-PREBSnLK-py3.10/lib/python3.10/site-packages/anyio/_backends/_asyncio.py", line 678, in __aexit__
  |     raise BaseExceptionGroup(
  | exceptiongroup.ExceptionGroup: unhandled errors in a TaskGroup (1 sub-exception)
  +-+---------------- 1 ----------------
    | Traceback (most recent call last):
    |   File "/home/ohad/.cache/pypoetry/virtualenvs/webserver-PREBSnLK-py3.10/lib/python3.10/site-packages/kr8s/_portforward.py", line 183, in _tcp_to_ws
    |     raise ConnectionClosedError("TCP socket closed")
    | kr8s._exceptions.ConnectionClosedError: TCP socket closed
    +------------------------------------
โ•ญโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€ Creating KubeCluster 'dask-ohad-f6e3d861-2' โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ•ฎ
โ”‚                                                                              โ”‚
โ”‚   DaskCluster                                                      Running   โ”‚
โ”‚   Scheduler Pod                                                    Running   โ”‚
โ”‚   Scheduler Service                                                Created   โ”‚
โ”‚   Default Worker Group                                             Created   โ”‚
โ”‚                                                                              โ”‚
โ”‚ โ ™ Getting dashboard URL                                                      โ”‚
โ•ฐโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ•ฏ
Task exception was never retrieved
future: <Task finished name='Task-29' coro=<PortForward._sync_sockets() done, defined at /home/ohad/.cache/pypoetry/virtualenvs/webserver-PREBSnLK-py3.10/lib/python3.10/site-packages/kr8s/_portforward.py:167> exception=ExceptionGroup('unhandled errors in a TaskGroup', [ConnectionClosedError('TCP socket closed')])>
  + Exception Group Traceback (most recent call last):
  |   File "/home/ohad/.cache/pypoetry/virtualenvs/webserver-PREBSnLK-py3.10/lib/python3.10/site-packages/kr8s/_portforward.py", line 171, in _sync_sockets
  |     async with anyio.create_task_group() as tg:
  |   File "/home/ohad/.cache/pypoetry/virtualenvs/webserver-PREBSnLK-py3.10/lib/python3.10/site-packages/anyio/_backends/_asyncio.py", line 678, in __aexit__
  |     raise BaseExceptionGroup(
  | exceptiongroup.ExceptionGroup: unhandled errors in a TaskGroup (1 sub-exception)
  +-+---------------- 1 ----------------
    | Traceback (most recent call last):
    |   File "/home/ohad/.cache/pypoetry/virtualenvs/webserver-PREBSnLK-py3.10/lib/python3.10/site-packages/kr8s/_portforward.py", line 183, in _tcp_to_ws
    |     raise ConnectionClosedError("TCP socket closed")
    | kr8s._exceptions.ConnectionClosedError: TCP socket closed
    +------------------------------------
/home/ohad/.cache/pypoetry/virtualenvs/webserver-PREBSnLK-py3.10/lib/python3.10/site-packages/distributed/client.py:1391: VersionMismatchWarning: Mismatched versions found

+---------+--------+-----------+---------+
| Package | Client | Scheduler | Workers |
+---------+--------+-----------+---------+
| lz4     | None   | 4.3.3     | None    |
| toolz   | 0.12.1 | 0.12.0    | None    |
| tornado | 6.4    | 6.3.3     | None    |
+---------+--------+-----------+---------+
  warnings.warn(version_module.VersionMismatchWarning(msg[0]["warning"]))
INFO:     http://localhost:50211/status
INFO:     created cluster

I donโ€™t see any errors in your output. Just noisy warnings which should be fixed up in the next release.

Can you add some more logging to help identify where it is getting stuck?