Deploying Dask with AKS

I admit I’m very new to Dask and don’t fully understand all the nuances. That being said, I’ve been tasked with using Dask on AKS with an orchestration tool called Prefect. Right now I’m just trying to connect to and spin up a Dask cluster and perform a simple operation like scale it up. I’ve installed the operator using helm as per the website and am running a simple python script to run the Dask scaling task:

from dask_kubernetes.operator import KubeCluster
cluster = KubeCluster(name='testcluster')
cluster.scale(2)
cluster.close()

When I run the script I get a 401 Unauthorized error. I assume the Kubernetes service account that is running the Dask task doesn’t have permissions but I don’t know how to troubleshoot this. Any help is greatly appreciated.

If you’re seeing the 401 at this point my guess is that you don’t have permission to create the custom resources that dask-kubernetes uses. Rather than the controller not having enough permissions.

I think the next troubleshooting step would be to try and create a cluster with kubectl instead of the Python API.

https://kubernetes.dask.org/en/latest/operator_resources.html#daskcluster

I was able to successfully deploy a Dask cluster using kubectl with the example manifest in the link you provided. If it makes a difference, the AKS cluster was created with AAD authentication. I don’t think that should matter if I’m running the python script locally though since I’m already connected to the kubernetes cluster.

How does it connect through the Python API? Does it leverage the local kubeconfig? What’s different about creating the cluster via kubectl versus through the Python API?

Ok great.

The Python API uses a combination of pykube-ng and kubernetes-asyncio to interact with the Kubernetes API. It reads either your local kubeconfig or service account credentials if being used in-cluster. I wonder if AAD uses some kind of kubectl plugin that these libraries don’t support? We do have to do some extra things for GKE auth.

The Python API effectively does what you would’ve done via kubectl. It generates a DaskCluster manifest based on the arguments and then creates it. It also does a couple of extra things like trying to set up a port forward. Could you share the full traceback so we can see exactly what operation is causing the 401?

Sure, here’s the traceback:

ERROR:root:
Traceback (most recent call last):
  File "C:\Tools\Anaconda3\lib\site-packages\dask_kubernetes\operator\kubecluster\kubecluster.py", line 284, in _create_cluster
    await custom_objects_api.create_namespaced_custom_object(
  File "C:\Tools\Anaconda3\lib\site-packages\kubernetes_asyncio\client\api_client.py", line 192, in __call_api
    raise e
  File "C:\Tools\Anaconda3\lib\site-packages\kubernetes_asyncio\client\api_client.py", line 185, in __call_api
    response_data = await self.request(
  File "C:\Tools\Anaconda3\lib\site-packages\kubernetes_asyncio\client\rest.py", line 230, in POST
    return (await self.request("POST", url,
  File "C:\Tools\Anaconda3\lib\site-packages\kubernetes_asyncio\client\rest.py", line 187, in request
    raise ApiException(http_resp=r)
kubernetes_asyncio.client.exceptions.ApiException: (401)
Reason: Unauthorized
HTTP response headers: <CIMultiDictProxy('Audit-Id': '9b4da9c5-49ba-48e5-bc35-4ba247a991b9', 'Cache-Control': 'no-cache, private', 'Content-Type': 'application/json', 'Date': 'Tue, 14 Mar 2023 17:03:06 GMT', 'Content-L
ength': '129')>
HTTP response body: {"kind":"Status","apiVersion":"v1","metadata":{},"status":"Failure","message":"Unauthorized","reason":"Unauthorized","code":401}



The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "C:\Projects\Lennar\dask\test.py", line 3, in <module>
    cluster = KubeCluster(name='testcluster',namespace='prefect-dev')
  File "C:\Tools\Anaconda3\lib\site-packages\dask_kubernetes\operator\kubecluster\kubecluster.py", line 229, in __init__
    self.sync(self._start)
  File "C:\Tools\Anaconda3\lib\site-packages\distributed\utils.py", line 338, in sync
    return sync(
  File "C:\Tools\Anaconda3\lib\site-packages\distributed\utils.py", line 405, in sync
    raise exc.with_traceback(tb)
  File "C:\Tools\Anaconda3\lib\site-packages\distributed\utils.py", line 378, in f
    result = yield future
  File "C:\Tools\Anaconda3\lib\site-packages\tornado\gen.py", line 762, in run
    value = future.result()
  File "C:\Tools\Anaconda3\lib\site-packages\dask_kubernetes\operator\kubecluster\kubecluster.py", line 252, in _start
    await self._create_cluster()
  File "C:\Tools\Anaconda3\lib\site-packages\dask_kubernetes\operator\kubecluster\kubecluster.py", line 292, in _create_cluster
    raise RuntimeError(
RuntimeError: Failed to create DaskCluster resource. Are the Dask Custom Resource Definitions installed? https://kubernetes.dask.org/en/latest/operator.html#installing-the-operator
ERROR:root:
Error in atexit._run_exitfuncs:
Traceback (most recent call last):
  File "C:\Tools\Anaconda3\lib\asyncio\runners.py", line 44, in run
    return loop.run_until_complete(main)
  File "C:\Tools\Anaconda3\lib\asyncio\base_events.py", line 647, in run_until_complete
    return future.result()
  File "C:\Tools\Anaconda3\lib\site-packages\dask_kubernetes\operator\kubecluster\kubecluster.py", line 909, in _reap_clusters
    cluster.close(timeout=10)
  File "C:\Tools\Anaconda3\lib\site-packages\dask_kubernetes\operator\kubecluster\kubecluster.py", line 571, in close
    return self.sync(self._close, timeout=timeout)
  File "C:\Tools\Anaconda3\lib\site-packages\distributed\utils.py", line 338, in sync
    return sync(
  File "C:\Tools\Anaconda3\lib\site-packages\distributed\utils.py", line 405, in sync
    raise exc.with_traceback(tb)
  File "C:\Tools\Anaconda3\lib\site-packages\distributed\utils.py", line 378, in f
    result = yield future
  File "C:\Tools\Anaconda3\lib\site-packages\tornado\gen.py", line 762, in run
    value = future.result()
  File "C:\Tools\Anaconda3\lib\site-packages\dask_kubernetes\operator\kubecluster\kubecluster.py", line 579, in _close
    await custom_objects_api.delete_namespaced_custom_object(
  File "C:\Tools\Anaconda3\lib\site-packages\kubernetes_asyncio\client\api_client.py", line 185, in __call_api
    response_data = await self.request(
  File "C:\Tools\Anaconda3\lib\site-packages\kubernetes_asyncio\client\rest.py", line 220, in DELETE
    return (await self.request("DELETE", url,
  File "C:\Tools\Anaconda3\lib\site-packages\kubernetes_asyncio\client\rest.py", line 177, in request
    r = await self.pool_manager.request(**args)
  File "C:\Tools\Anaconda3\lib\site-packages\aiohttp\client.py", line 536, in _request
    conn = await self._connector.connect(
  File "C:\Tools\Anaconda3\lib\site-packages\aiohttp\connector.py", line 540, in connect
    proto = await self._create_connection(req, traces, timeout)
  File "C:\Tools\Anaconda3\lib\site-packages\aiohttp\connector.py", line 901, in _create_connection
    _, proto = await self._create_direct_connection(req, traces, timeout)
  File "C:\Tools\Anaconda3\lib\site-packages\aiohttp\connector.py", line 1152, in _create_direct_connection
    hosts = await asyncio.shield(host_resolved)
  File "C:\Tools\Anaconda3\lib\site-packages\aiohttp\connector.py", line 874, in _resolve_host
    addrs = await self._resolver.resolve(host, port, family=self._family)
  File "C:\Tools\Anaconda3\lib\site-packages\aiohttp\resolver.py", line 33, in resolve
    infos = await self._loop.getaddrinfo(
  File "C:\Tools\Anaconda3\lib\asyncio\base_events.py", line 861, in getaddrinfo
    return await self.run_in_executor(
  File "C:\Tools\Anaconda3\lib\asyncio\base_events.py", line 819, in run_in_executor
    executor.submit(func, *args), loop=self)
  File "C:\Tools\Anaconda3\lib\concurrent\futures\thread.py", line 167, in submit
    raise RuntimeError('cannot schedule new futures after shutdown')
RuntimeError: cannot schedule new futures after shutdown

Yeah ok so it is the custom_objects_api.create_namespaced_custom_object call that fails, which is equivalent to the kubectl create -f daskcluster.yaml call.

Would you mind sharing your kubeconfig file (removing any certs/tokens/passwords) so I can see if it uses some kind of AKS/AAD plugin? My guess is we need to add support for this in dask-kubernetes.

I believe the plugin you’re looking for is Kubelogin (GitHub - Azure/kubelogin: A Kubernetes credential (exec) plugin implementing azure authentication). For security reasons, I do not want to share my full kubeconfig file (even with the secret values omitted). I think this is the relevant section though:

users:
- name: clusterUser_resourceGroup_aksClusterName
  user:
    exec:
      apiVersion: client.authentication.k8s.io/v1beta1
      args:
      - get-token
      - --environment
      - AzurePublicCloud
      - --server-id
      - <server id>
      - --client-id
      - <client id>
      - --tenant-id
      - <tenant id>
      - --login
      - devicecode
      command: kubelogin
      env: null
      interactiveMode: IfAvailable
      provideClusterInfo: false

That’s perfect thanks, that’s enough to point me in the right direction.

The think the answer today is that I’m not sure that any of the Kubernetes Python libraries available have support for exec authentication methods. So we can’t support AKS/AAD today.

I’ve opened Support kubectl auth plugins for Python API · Issue #678 · dask/dask-kubernetes · GitHub where we will investigate adding support in the future.