CancelledError while visualizing registered table using Dask-SQL

Hi all,

I have created dask kubernetes cluster in my local machine using:

from dask_kubernetes.operator import KubeCluster

cluster = KubeCluster(name="mycluster",
                      image='ghcr.io/dask/dask:latest',
                      n_workers=2,
                      env={"FOO": "bar"},
                      resources={"requests": {"memory": "0.5Gi"}, "limits": {"memory": "2Gi"}})
#cluster.scale(2)
cluster

Cluster was created successfully and then I used the following Dask-Sql code

from dask.distributed import Client

# Connect Dask to the cluster
client = Client(cluster)

c = Context()  # Python equivalent to a SQL database

c.sql("""
    CREATE OR REPLACE TABLE
        "IRIS_data1"
    WITH (
        location = 'datasets\IRIS.csv',           # IRIS data 5KB
        format = 'csv'
    )
       """)

It also successful, then I used
c.sql("SHOW TABLES FROM root").compute() this raise the CancelledError (Note : I did not get this error when I tried using dask Local cluster).

CancelledError                            Traceback (most recent call last)
Cell In[28], line 1
----> 1 c.sql("SHOW TABLES FROM root").compute()

File ~\Anaconda\envs\dask_k8s_310\lib\site-packages\dask\base.py:314, in DaskMethodsMixin.compute(self, **kwargs)
    290 def compute(self, **kwargs):
    291     """Compute this dask collection
    292 
    293     This turns a lazy Dask collection into its in-memory equivalent.
   (...)
    312     dask.base.compute
    313     """
--> 314     (result,) = compute(self, traverse=False, **kwargs)
    315     return result

File ~\Anaconda\envs\dask_k8s_310\lib\site-packages\dask\base.py:599, in compute(traverse, optimize_graph, scheduler, get, *args, **kwargs)
    596     keys.append(x.__dask_keys__())
    597     postcomputes.append(x.__dask_postcompute__())
--> 599 results = schedule(dsk, keys, **kwargs)
    600 return repack([f(r, *a) for r, (f, a) in zip(results, postcomputes)])

File ~\Anaconda\envs\dask_k8s_310\lib\site-packages\distributed\client.py:3186, in Client.get(self, dsk, keys, workers, allow_other_workers, resources, sync, asynchronous, direct, retries, priority, fifo_timeout, actors, **kwargs)
   3184         should_rejoin = False
   3185 try:
-> 3186     results = self.gather(packed, asynchronous=asynchronous, direct=direct)
   3187 finally:
   3188     for f in futures.values():

File ~\Anaconda\envs\dask_k8s_310\lib\site-packages\distributed\client.py:2345, in Client.gather(self, futures, errors, direct, asynchronous)
   2343 else:
   2344     local_worker = None
-> 2345 return self.sync(
   2346     self._gather,
   2347     futures,
   2348     errors=errors,
   2349     direct=direct,
   2350     local_worker=local_worker,
   2351     asynchronous=asynchronous,
   2352 )

File ~\Anaconda\envs\dask_k8s_310\lib\site-packages\distributed\utils.py:349, in SyncMethodMixin.sync(self, func, asynchronous, callback_timeout, *args, **kwargs)
    347     return future
    348 else:
--> 349     return sync(
    350         self.loop, func, *args, callback_timeout=callback_timeout, **kwargs
    351     )

File ~\Anaconda\envs\dask_k8s_310\lib\site-packages\distributed\utils.py:416, in sync(loop, func, callback_timeout, *args, **kwargs)
    414 if error:
    415     typ, exc, tb = error
--> 416     raise exc.with_traceback(tb)
    417 else:
    418     return result

File ~\Anaconda\envs\dask_k8s_310\lib\site-packages\distributed\utils.py:389, in sync.<locals>.f()
    387         future = wait_for(future, callback_timeout)
    388     future = asyncio.ensure_future(future)
--> 389     result = yield future
    390 except Exception:
    391     error = sys.exc_info()

File ~\Anaconda\envs\dask_k8s_310\lib\site-packages\tornado\gen.py:769, in Runner.run(self)
    766 exc_info = None
    768 try:
--> 769     value = future.result()
    770 except Exception:
    771     exc_info = sys.exc_info()

File ~\Anaconda\envs\dask_k8s_310\lib\site-packages\distributed\client.py:2209, in Client._gather(self, futures, errors, direct, local_worker)
   2207     else:
   2208         raise exception.with_traceback(traceback)
-> 2209     raise exc
   2210 if errors == "skip":
   2211     bad_keys.add(key)

CancelledError: ('rename-33dbee6297babafb5f2fe10ede4518a8', 0)

Can you please help me on this

Hi @Nirajkanth, welcome to Dask Discourse forum!

I took the liberty to edit your post for better code readability.

First, are you able to use your KubeCluster with a simple call like:

a =  c.submit(lambda x: x+1, 10)
a.result()

Hi @guillaumeeb,
Thank you for looking on this. I tried above mentioned code:

a =  client.submit(lambda x: x+1, 10)
a.result()

I am still getting the following error,

---------------------------------------------------------------------------
CancelledError                            Traceback (most recent call last)
Cell In[4], line 1
----> 1 a.result()

File ~\Anaconda\envs\dask_k8s_310\lib\site-packages\distributed\client.py:316, in Future.result(self, timeout)
    314     raise exc.with_traceback(tb)
    315 elif self.status == "cancelled":
--> 316     raise result
    317 else:
    318     return result

CancelledError: lambda-2184fb43ada4f66636b120b4ad626720

Please check on this…

Well, it seems your KubeCluster is not correctly started, or there is a problem with it. This can be between your Scheduler and Workers, or between your Client and Scheduler. It’s hard to debug without having access to it.

Are you able to open the Dask Dashboard? Could you check the logs from the Scheduler and Workers pods?

I can only access to Dask-dashboard, can not access scheduler-dash board.
Last part of the log:
[2023-08-03 09:43:44,941] kopf.objects [INFO ] [default/myclust] Timer ‘daskcluster_autoshutdown’ succeeded.
[2023-08-03 09:43:47,848] kopf.activities.prob [INFO ] Activity ‘now’ succeeded.
[2023-08-03 09:43:47,849] aiohttp.access [INFO ] 10.244.0.1 [03/Aug/2023:09:43:47 +0000] “GET /healthz HTTP/1.1” 200 214 “-” “kube-probe/1.27”
[2023-08-03 09:43:49,945] kopf.objects [INFO ] [default/myclust] Timer ‘daskcluster_autoshutdown’ succeeded.
[2023-08-03 09:43:54,947] kopf.objects [INFO ] [default/myclust] Timer ‘daskcluster_autoshutdown’ succeeded.
[2023-08-03 09:43:57,848] aiohttp.access [INFO ] 10.244.0.1 [03/Aug/2023:09:43:57 +0000] “GET /healthz HTTP/1.1” 200 214 “-” “kube-probe/1.27”
[2023-08-03 09:43:59,952] kopf.objects [INFO ] [default/myclust] Timer ‘daskcluster_autoshutdown’ succeeded.
[2023-08-03 09:44:04,954] kopf.objects [INFO ] [default/myclust] Timer ‘daskcluster_autoshutdown’ succeeded.
[2023-08-03 09:44:07,850] kopf.activities.prob [INFO ] Activity ‘now’ succeeded.
[2023-08-03 09:44:07,852] aiohttp.access [INFO ] 10.244.0.1 [03/Aug/2023:09:44:07 +0000] “GET /healthz HTTP/1.1” 200 214 “-” “kube-probe/1.27”
[2023-08-03 09:44:09,959] kopf.objects [INFO ] [default/myclust] Timer ‘daskcluster_autoshutdown’ succeeded.
#cluster.close()

Please have a look

Can you see the workers on the Dashboard? What happens here when you submit a task?

These logs are not really useful, could you get Worker pod logs?

Thanks for your effort. But I am still having the isses:


When I submit a task nothing happens in the worker.
What might be the reason?

Well, it’s very hard to help without logs on Worker side. Looking at the KubeCluster repr, it seems your clusteris correctly started with connected Workers. Maybe the problem is between the Client and the Scheduler?

Thank you for your suggestion, as you said there was problem between the client and the scheduler. Actually there was a dask version mismatching issue, now I solved the issue.

I am having another issues related to the path of the datasets.

Data location : “D:\codes\Dask\datasets\IRIS.csv”
OS : Windows with Jupyter notebook

I read the dataset as follows:

But it shows the following path error

I am not sure why is takes “\D:/codes/Dask/datasets/IRIS.csv” compared to original location of the dataset.

If you can commend on this, it’d be much beneficial.
Thanks in an advance

Windows paths are always complicated to deal with.

But I’m not sure this is the issue, where did you start your KubeCluster. Does Worker inside it have access to your local file system?

Thanks for your continuous support, solved this issue using WSL