Hi all,
I have created dask kubernetes cluster in my local machine using:
from dask_kubernetes.operator import KubeCluster
cluster = KubeCluster(name="mycluster",
image='ghcr.io/dask/dask:latest',
n_workers=2,
env={"FOO": "bar"},
resources={"requests": {"memory": "0.5Gi"}, "limits": {"memory": "2Gi"}})
#cluster.scale(2)
cluster
Cluster was created successfully and then I used the following Dask-Sql code
from dask.distributed import Client
# Connect Dask to the cluster
client = Client(cluster)
c = Context() # Python equivalent to a SQL database
c.sql("""
CREATE OR REPLACE TABLE
"IRIS_data1"
WITH (
location = 'datasets\IRIS.csv', # IRIS data 5KB
format = 'csv'
)
""")
It also successful, then I used
c.sql("SHOW TABLES FROM root").compute()
this raise the CancelledError (Note : I did not get this error when I tried using dask Local cluster).
CancelledError Traceback (most recent call last)
Cell In[28], line 1
----> 1 c.sql("SHOW TABLES FROM root").compute()
File ~\Anaconda\envs\dask_k8s_310\lib\site-packages\dask\base.py:314, in DaskMethodsMixin.compute(self, **kwargs)
290 def compute(self, **kwargs):
291 """Compute this dask collection
292
293 This turns a lazy Dask collection into its in-memory equivalent.
(...)
312 dask.base.compute
313 """
--> 314 (result,) = compute(self, traverse=False, **kwargs)
315 return result
File ~\Anaconda\envs\dask_k8s_310\lib\site-packages\dask\base.py:599, in compute(traverse, optimize_graph, scheduler, get, *args, **kwargs)
596 keys.append(x.__dask_keys__())
597 postcomputes.append(x.__dask_postcompute__())
--> 599 results = schedule(dsk, keys, **kwargs)
600 return repack([f(r, *a) for r, (f, a) in zip(results, postcomputes)])
File ~\Anaconda\envs\dask_k8s_310\lib\site-packages\distributed\client.py:3186, in Client.get(self, dsk, keys, workers, allow_other_workers, resources, sync, asynchronous, direct, retries, priority, fifo_timeout, actors, **kwargs)
3184 should_rejoin = False
3185 try:
-> 3186 results = self.gather(packed, asynchronous=asynchronous, direct=direct)
3187 finally:
3188 for f in futures.values():
File ~\Anaconda\envs\dask_k8s_310\lib\site-packages\distributed\client.py:2345, in Client.gather(self, futures, errors, direct, asynchronous)
2343 else:
2344 local_worker = None
-> 2345 return self.sync(
2346 self._gather,
2347 futures,
2348 errors=errors,
2349 direct=direct,
2350 local_worker=local_worker,
2351 asynchronous=asynchronous,
2352 )
File ~\Anaconda\envs\dask_k8s_310\lib\site-packages\distributed\utils.py:349, in SyncMethodMixin.sync(self, func, asynchronous, callback_timeout, *args, **kwargs)
347 return future
348 else:
--> 349 return sync(
350 self.loop, func, *args, callback_timeout=callback_timeout, **kwargs
351 )
File ~\Anaconda\envs\dask_k8s_310\lib\site-packages\distributed\utils.py:416, in sync(loop, func, callback_timeout, *args, **kwargs)
414 if error:
415 typ, exc, tb = error
--> 416 raise exc.with_traceback(tb)
417 else:
418 return result
File ~\Anaconda\envs\dask_k8s_310\lib\site-packages\distributed\utils.py:389, in sync.<locals>.f()
387 future = wait_for(future, callback_timeout)
388 future = asyncio.ensure_future(future)
--> 389 result = yield future
390 except Exception:
391 error = sys.exc_info()
File ~\Anaconda\envs\dask_k8s_310\lib\site-packages\tornado\gen.py:769, in Runner.run(self)
766 exc_info = None
768 try:
--> 769 value = future.result()
770 except Exception:
771 exc_info = sys.exc_info()
File ~\Anaconda\envs\dask_k8s_310\lib\site-packages\distributed\client.py:2209, in Client._gather(self, futures, errors, direct, local_worker)
2207 else:
2208 raise exception.with_traceback(traceback)
-> 2209 raise exc
2210 if errors == "skip":
2211 bad_keys.add(key)
CancelledError: ('rename-33dbee6297babafb5f2fe10ede4518a8', 0)
Can you please help me on this