Hello @jacobtomlinson I moved this question from the Slack to here.
I am new with Dask, and I struggle to understand how to use Dask with the cloud.
Our app is a webserver in a docker container, and we want to compute long process with Dask on AKS.
I already created Azure Kubernetes Service cluster and installed Dask using the helm chart 2024.1.1.
now my problem is how to configure my code to use this cluster.
I tried the following code, but it doesnโt work I wondered if someone could give me a snippet with step by step how to make it work, because Iโm lost so far and didnโt find anything like this in the docs or forum.
from dask.distributed import Client
from dask_kubernetes.classic import KubeCluster, make_pod_spec
from dask_kubernetes import KubeConfig
from dask.array import random
import os
pod_spec = make_pod_spec(image='ghcr.io/dask/dask:latest',
memory_limit='4G', memory_request='4G',
cpu_limit=1, cpu_request=1)
def source():
absolute_path = os.path.dirname(__file__)
relative_path = "../../kubeconfig.yaml"
full_path = os.path.join(absolute_path, relative_path)
if os.path.isfile(full_path):
auth = KubeConfig(config_file=full_path)
cluster = KubeCluster(pod_spec, auth=auth)
cluster.scale(10)
client = Client(cluster)
x = random.random((1000, 1000), chunks=(100, 100))
result = x.sum().compute()
cluster.scale(0)
client.close()
Thanks for raising this. The first thing I would say is youโre using the classic kubecluster which is deprecated. Please try again with the modern version.
Ok so i managed to do it with Gateway and i was able to connect to the cluster.
However when i run the command to grow cluster it throws the following error.
i saw the ticket is closed on GitHub so i wonder maybe I miss something, like do i need to install two helm charts ? one for dask and one for gateway ?
error:
RuntimeError: Accessing the loop property while the loop is not running is not supported
def dask_operation():
logger.info("connection to dask AKS")
gateway = Gateway("http://IP")
cluster = gateway.new_cluster()
logger.info("scaling to 10")
cluster.scale(10)
With the operator you just configure it like you would configure kubectl. If your config is in the default location it will pick it up, if not you set it with the KUBE_CONFIG environment variable.
Dask Gateway is a separate project that doesnโt currently have any active maintainers. If youโre having problems there I recommend opening an issue on the repo and perhaps someone from the community can help you out.
Please try to explain your problem in detail wether it is here or on Dask Gateway github issue tracker. How did you deploy the Dask Gateway helm chart (which should work on its own), and the complete stack trace you are getting!
from dask_gateway import Gateway
gateway = Gateway("<gateway service address>")
cluster = gateway.new_cluster()
@jacobtomlinson if I use the operator what do I need to install on the Kubernetes cluster? just the helm chart of the operator like in the docs, also if I run the app in a docker how you will suggest for me to keep the kubeconfig (it canโt be in the original spot), I need a portable solution.
Then as @jacobtomlinson says, if you used the basic configuration and are seeing errors, please open a github issue. Could you also explain from where are you running the dask_operation() function?
i still i think that i do something wrong. on the AKS I only deployed the gateway. is it enough? like the rest of the cluster will be created automatically or I need to deploy something else also?
So, I was doing the following example and here is the error with full stack.
on the AKS I only installed the helm for the gate-way.
and running the following code:
INFO: connection to dask AKS
INFO: created cluster
INFO: closed cluster
Exception ignored in: <function Gateway.__del__ at 0x7ff63e4c8940>
Traceback (most recent call last):
File "/home/ohad/.cache/pypoetry/virtualenvs/webserver-PREBSnLK-py3.10/lib/python3.10/site-packages/dask_gateway/client.py", line 380, in __del__
self.close()
File "/home/ohad/.cache/pypoetry/virtualenvs/webserver-PREBSnLK-py3.10/lib/python3.10/site-packages/dask_gateway/client.py", line 353, in close
elif self.loop.asyncio_loop.is_running():
File "/home/ohad/.cache/pypoetry/virtualenvs/webserver-PREBSnLK-py3.10/lib/python3.10/site-packages/dask_gateway/client.py", line 330, in loop
return self._loop_runner.loop
File "/home/ohad/.cache/pypoetry/virtualenvs/webserver-PREBSnLK-py3.10/lib/python3.10/site-packages/distributed/utils.py", line 660, in loop
raise RuntimeError(
RuntimeError: Accessing the loop property while the loop is not running is not supported
@jacobtomlinson
I have used the Dask Operator on my AKS and connected to it with the โkubeconfigโ. but there is an error in the following example and the code stuck and nothing happens.
what could be the issue?
from dask_kubernetes.operator import KubeCluster
import dask.array as da
def dask_operation():
logger.info("connection to dask")
cluster = KubeCluster()
client = cluster.get_client()
logger.info(cluster.dashboard_link)
logger.info("created cluster")
# Perform heavy computation using the Dask client
start = time.time()
for i in range(1000):
result = my_heavy_computation_function()
runtime = time.time() - start
logger.info(runtime)
# Close the cluster and client
client.close()
cluster.close()
logger.info("closed cluster")
def my_heavy_computation_function():
# Simulate heavy computation by performing a large matrix multiplication
matrix_size = 1000
matrix_a = da.random.random(matrix_size, matrix_size)
matrix_b = da.random.random(matrix_size, matrix_size)
result = da.routines.dot(matrix_a, matrix_b)
return result.compute()
INFO: connection to dask
Task exception was never retrieved
future: <Task finished name='Task-14' coro=<PortForward._sync_sockets() done, defined at
/home/ohad/.cache/pypoetry/virtualenvs/webserver-PREBSnLK-py3.10/lib/python3.10/site-packages/kr8s/_portforward.py:167> exception=ExceptionGroup('unhandled errors in a
TaskGroup', [ConnectionClosedError('TCP socket closed')])>
+ Exception Group Traceback (most recent call last):
| File "/home/ohad/.cache/pypoetry/virtualenvs/webserver-PREBSnLK-py3.10/lib/python3.10/site-packages/kr8s/_portforward.py", line 171, in _sync_sockets
| async with anyio.create_task_group() as tg:
| File "/home/ohad/.cache/pypoetry/virtualenvs/webserver-PREBSnLK-py3.10/lib/python3.10/site-packages/anyio/_backends/_asyncio.py", line 678, in __aexit__
| raise BaseExceptionGroup(
| exceptiongroup.ExceptionGroup: unhandled errors in a TaskGroup (1 sub-exception)
+-+---------------- 1 ----------------
| Traceback (most recent call last):
| File "/home/ohad/.cache/pypoetry/virtualenvs/webserver-PREBSnLK-py3.10/lib/python3.10/site-packages/kr8s/_portforward.py", line 183, in _tcp_to_ws
| raise ConnectionClosedError("TCP socket closed")
| kr8s._exceptions.ConnectionClosedError: TCP socket closed
+------------------------------------
โญโโโโโโโโโโโโโโโโ Creating KubeCluster 'dask-ohad-f6e3d861-2' โโโโโโโโโโโโโโโโโโฎ
โ โ
โ DaskCluster Running โ
โ Scheduler Pod Running โ
โ Scheduler Service Created โ
โ Default Worker Group Created โ
โ โ
โ โ Getting dashboard URL โ
โฐโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโฏ
Task exception was never retrieved
future: <Task finished name='Task-29' coro=<PortForward._sync_sockets() done, defined at /home/ohad/.cache/pypoetry/virtualenvs/webserver-PREBSnLK-py3.10/lib/python3.10/site-packages/kr8s/_portforward.py:167> exception=ExceptionGroup('unhandled errors in a TaskGroup', [ConnectionClosedError('TCP socket closed')])>
+ Exception Group Traceback (most recent call last):
| File "/home/ohad/.cache/pypoetry/virtualenvs/webserver-PREBSnLK-py3.10/lib/python3.10/site-packages/kr8s/_portforward.py", line 171, in _sync_sockets
| async with anyio.create_task_group() as tg:
| File "/home/ohad/.cache/pypoetry/virtualenvs/webserver-PREBSnLK-py3.10/lib/python3.10/site-packages/anyio/_backends/_asyncio.py", line 678, in __aexit__
| raise BaseExceptionGroup(
| exceptiongroup.ExceptionGroup: unhandled errors in a TaskGroup (1 sub-exception)
+-+---------------- 1 ----------------
| Traceback (most recent call last):
| File "/home/ohad/.cache/pypoetry/virtualenvs/webserver-PREBSnLK-py3.10/lib/python3.10/site-packages/kr8s/_portforward.py", line 183, in _tcp_to_ws
| raise ConnectionClosedError("TCP socket closed")
| kr8s._exceptions.ConnectionClosedError: TCP socket closed
+------------------------------------
/home/ohad/.cache/pypoetry/virtualenvs/webserver-PREBSnLK-py3.10/lib/python3.10/site-packages/distributed/client.py:1391: VersionMismatchWarning: Mismatched versions found
+---------+--------+-----------+---------+
| Package | Client | Scheduler | Workers |
+---------+--------+-----------+---------+
| lz4 | None | 4.3.3 | None |
| toolz | 0.12.1 | 0.12.0 | None |
| tornado | 6.4 | 6.3.3 | None |
+---------+--------+-----------+---------+
warnings.warn(version_module.VersionMismatchWarning(msg[0]["warning"]))
INFO: http://localhost:50211/status
INFO: created cluster