I’m encountering an issue while trying to start a DASK cluster in Kubeflow. Here’s what I’ve done so far:
-
I installed the DASK Kubernetes operator using the following Helm command:
$ helm install --create-namespace -n dask-operator --generate-name dask/dask-kubernetes-operator --version 2024.3.1
-
I patched the
kubeflow-kubernetes-edit
cluster role using the following command:$ kubectl patch clusterrole kubeflow-kubernetes-edit --type="json" --patch '[{"op": "add", "path": "/rules/-", "value": {"apiGroups": ["kubernetes.dask.org"],"resources": ["*"],"verbs": ["*"]}}]'
-
I started a notebook and ran the following Python code:
from dask_kubernetes.operator import KubeCluster cluster = KubeCluster(name="my-dask-cluster", image='ghcr.io/dask/dask:latest', n_workers=3, resources={"requests": {"memory": "1Gi"}, "limits": {"memory": "1Gi"}})
However, I encountered the following error:
---------------------------------------------------------------------------
ObjectDoesNotExist Traceback (most recent call last)
Cell In [2], line 1
----> 1 cluster = KubeCluster(name="dask-cluster", image='ghcr.io/dask/dask:latest', n_workers=2, resources={"requests": {"memory": "1Gi"}, "limits": {"memory": "1Gi"}})
File /opt/conda/lib/python3.8/site-packages/dask_kubernetes/operator/kubecluster/kubecluster.py:244, in KubeCluster.__init__(self, name, namespace, image, n_workers, resources, env, worker_command, auth, port_forward_cluster_ip, create_mode, shutdown_on_close, resource_timeout, scheduler_service_type, custom_cluster_spec, scheduler_forward_port, **kwargs)
242 if not self.asynchronous:
243 self._loop_runner.start()
--> 244 self.sync(self._start)
File /opt/conda/lib/python3.8/site-packages/distributed/utils.py:351, in SyncMethodMixin.sync(self, func, asynchronous, callback_timeout, *args, **kwargs)
349 return future
350 else:
--> 351 return sync(
352 self.loop, func, *args, callback_timeout=callback_timeout, **kwargs
353 )
File /opt/conda/lib/python3.8/site-packages/distributed/utils.py:418, in sync(loop, func, callback_timeout, *args, **kwargs)
416 if error:
417 typ, exc, tb = error
--> 418 raise exc.with_traceback(tb)
419 else:
420 return result
File /opt/conda/lib/python3.8/site-packages/distributed/utils.py:391, in sync.<locals>.f()
389 future = wait_for(future, callback_timeout)
390 future = asyncio.ensure_future(future)
--> 391 result = yield future
392 except Exception:
393 error = sys.exc_info()
File /opt/conda/lib/python3.8/site-packages/tornado/gen.py:769, in Runner.run(self)
766 exc_info = None
768 try:
--> 769 value = future.result()
770 except Exception:
771 exc_info = sys.exc_info()
File /opt/conda/lib/python3.8/site-packages/dask_kubernetes/operator/kubecluster/kubecluster.py:280, in KubeCluster._start(self)
278 else:
279 self._log("Creating cluster")
--> 280 await self._create_cluster()
282 await super()._start()
283 self._log(f"Ready, dashboard available at {self.dashboard_link}")
File /opt/conda/lib/python3.8/site-packages/dask_kubernetes/operator/kubecluster/kubecluster.py:340, in KubeCluster._create_cluster(self)
338 try:
339 self._log("Waiting for scheduler pod")
--> 340 await wait_for_scheduler(
341 self.k8s_api,
342 self.name,
343 self.namespace,
344 timeout=self._resource_timeout,
345 )
346 except CrashLoopBackOffError as e:
347 logs = await self._get_logs()
File /opt/conda/lib/python3.8/site-packages/dask_kubernetes/common/networking.py:199, in wait_for_scheduler(api, cluster_name, namespace, timeout)
197 pod_start_time = None
198 while True:
--> 199 pod = await Pod.objects(api, namespace=namespace).get_by_name(
200 cluster_name + "-scheduler"
201 )
202 phase = pod.obj["status"]["phase"]
203 if phase == "Running":
File /opt/conda/lib/python3.8/site-packages/dask_kubernetes/aiopykube/query.py:19, in Query.get_by_name(self, name)
18 async def get_by_name(self, name: str):
---> 19 return await self._sync(super().get_by_name, name=name)
File /opt/conda/lib/python3.8/site-packages/dask_kubernetes/aiopykube/mixins.py:9, in AsyncMixin._sync(self, func, *args, **kwargs)
8 async def _sync(self, func, *args, **kwargs):
----> 9 return await asyncio.get_event_loop().run_in_executor(
10 None, functools.partial(func, *args, **kwargs)
11 )
File /opt/conda/lib/python3.8/concurrent/futures/thread.py:57, in _WorkItem.run(self)
54 return
56 try:
---> 57 result = self.fn(*self.args, **self.kwargs)
58 except BaseException as exc:
59 self.future.set_exception(exc)
File /opt/conda/lib/python3.8/site-packages/pykube/query.py:119, in Query.get_by_name(self, name)
117 print(r)
118 if r.status_code == 404:
--> 119 raise ObjectDoesNotExist(f"{name} does not exist.")
120 self.api.raise_for_status(r)
121 return self.api_obj_class(self.api, r.json())
ObjectDoesNotExist: dask-cluster-scheduler does not exist.
Could someone please help me understand why I’m encountering this error and how I can resolve it? Any guidance or suggestions would be greatly appreciated!