Running dask from k8s cluster

I am trying to run my dask application with a cron job from within the cluster. I have created the service account, role and role binging. It is a very simple demo I am trying to do.
My container has a python script that runs the script below with the corresponding worker-spec.yml.
How do i use the Dask InCluster ?
I am wondering if i am doing it correctly since i am getting some errors when job starts running

Creating scheduler pod on cluster. This may take some time.
2022-05-17 21:32:17,305 - distributed.deploy.spec - WARNING - Cluster closed without starting up
Traceback (most recent call last):
File "/usr/local/lib/python3.10/dist-packages/distributed/deploy/spec.py", line 294, in _start
self.scheduler = await self.scheduler
File "/usr/local/lib/python3.10/dist-packages/distributed/deploy/spec.py", line 59, in _
await self.start()
File "/usr/local/lib/python3.10/dist-packages/dask_kubernetes/classic/kubecluster.py", line 198, in start
logs = await self.logs()
File "/usr/local/lib/python3.10/dist-packages/dask_kubernetes/classic/kubecluster.py", line 118, in logs
raise e
File "/usr/local/lib/python3.10/dist-packages/dask_kubernetes/classic/kubecluster.py", line 109, in logs
log = await self.core_api.read_namespaced_pod_log(
File "/usr/local/lib/python3.10/dist-packages/kubernetes_asyncio/client/api_client.py", line 192, in __call_api
raise e
File "/usr/local/lib/python3.10/dist-packages/kubernetes_asyncio/client/api_client.py", line 185, in __call_api
response_data = await self.request(
File "/usr/local/lib/python3.10/dist-packages/kubernetes_asyncio/client/rest.py", line 193, in GET
return (await self.request("GET", url,
File "/usr/local/lib/python3.10/dist-packages/kubernetes_asyncio/client/rest.py", line 187, in request
raise ApiException(http_resp=r)
kubernetes_asyncio.client.exceptions.ApiException: (400)
Reason: Bad Request
HTTP response headers: <CIMultiDictProxy('Audit-Id': '7cea313d-78bb-4d52-aada-2a56b256ef0a', 'Cache-Control': 'no-cache, private', 'Content-Type': 'application/json', 'Date': 'Tue, 17 May 2022 21:32:17 GMT', 'Content-Length': '183')>
HTTP response body: {"kind":"Status","apiVersion":"v1","metadata":{},"status":"Failure","message":"container dask-worker is not valid for pod dask-root-5c1cad7b-18sbnn","reason":"BadRequest","code":400}
The above exception was the direct cause of the following exception:
Traceback (most recent call last):
File "/opt/app/run.py", line 6, in <module>
cluster = KubeCluster(pod_template="worker-spec.yml", namespace="myns")
File "/usr/local/lib/python3.10/dist-packages/dask_kubernetes/classic/kubecluster.py", line 496, in __init__
super().__init__(**self.kwargs)
File "/usr/local/lib/python3.10/dist-packages/distributed/deploy/spec.py", line 260, in __init__
self.sync(self._start)
File "/usr/local/lib/python3.10/dist-packages/distributed/utils.py", line 318, in sync
return sync(
File "/usr/local/lib/python3.10/dist-packages/distributed/utils.py", line 385, in sync
raise exc.with_traceback(tb)
File "/usr/local/lib/python3.10/dist-packages/distributed/utils.py", line 358, in f
result = yield future
File "/usr/local/lib/python3.10/dist-packages/tornado/gen.py", line 762, in run
value = future.result()
File "/usr/local/lib/python3.10/dist-packages/dask_kubernetes/classic/kubecluster.py", line 627, in _start
await super()._start()
File "/usr/local/lib/python3.10/dist-packages/distributed/deploy/spec.py", line 304, in _start
raise RuntimeError(f"Cluster failed to start: {e}") from e
RuntimeError: Cluster failed to start: (400)
Reason: Bad Request
HTTP response headers: <CIMultiDictProxy('Audit-Id': '7cea313d-78bb-4d52-aada-2a56b256ef0a', 'Cache-Control': 'no-cache, private', 'Content-Type': 'application/json', 'Date': 'Tue, 17 May 2022 21:32:17 GMT', 'Content-Length': '183')>
HTTP response body: {"kind":"Status","apiVersion":"v1","metadata":{},"status":"Failure","message":"container dask-worker is not valid for pod dask-root-5c1cad7b-18sbnn","reason":"BadRequest","code":400}
Unclosed client session
client_session: <aiohttp.client.ClientSession object at 0x7fb63f596da0>
Unclosed connector
connections: ['[(<aiohttp.client_proto.ResponseHandler object at 0x7fb63f578e80>, 619154.475991593)]']
connector: <aiohttp.connector.TCPConnector object at 0x7fb63f596c20>
Unclosed client session
client_session: <aiohttp.client.ClientSession object at 0x7fb63f597100>

Running this from the cron job container:

from dask_kubernetes import KubeCluster
from dask.distributed import Client
import dask.array as da

cluster = KubeCluster(pod_template="worker-spec.yml", namespace="myns")
cluster.adapt(minimum=0, maximum=10)
client = Client(cluster)
array = da.ones((10000, 100000, 1000))
print(array.mean().compute())  # Should print 1.0
client.shutdown()

the cron job yml

apiVersion: batch/v1
kind: CronJob
metadata:
  name: test
  namespace:myns
spec:
  schedule: "*/5 * * * *"
  jobTemplate:
    spec:
      template:
        spec:
          serviceAccountName: dask-sa
          containers:
            - image: <my image with the python file>
              imagePullPolicy: IfNotPresent

Read resolution here Running dask from within k8s cluster · Issue #498 · dask/dask-kubernetes · GitHub

1 Like

Thanks for sharing, @jadeidev!