Hi @pavithraes,
Thank you for the reply
So I had a bit of success with it, but still having a few errors. I am running this example snippet of code:
import dask
df = dask.datasets.timeseries()
ddf = df[df['id'] > 1000]
dddf = ddf['id'].sum()
dask.compute(dddf)
The above snippet compiles fine, but the following does not (using own storage system; access via direct HTTP address):
df2 = dd.read_csv("http://192.168.49.2:8080/v1/objects/dask-demo-bucket/zillow.csv")
new_homes_subset = df2[df2[' "Year"'] < 2000]
large_new_homes_subset = new_homes_subset[' "Year"'].sum()
dask.compute(large_new_homes_subset)
For extra information, I used the same KubeCluster specifications as described in my original post. When I run the KubeCluster, I am getting an error (still worked for the first code snippet):
Creating scheduler pod on cluster. This may take some time.
2022-08-09 15:45:33,758 - distributed.client - ERROR - Failed to reconnect to scheduler after 30.00 seconds, closing client
2022-08-09 15:46:07,218 - distributed.deploy.adaptive - INFO - Adaptive scaling started: minimum=0 maximum=inf
/home/ryan/.local/lib/python3.10/site-packages/distributed/client.py:1309: VersionMismatchWarning: Mismatched versions found
+-------------+----------+-----------------------+---------+
| Package | client | scheduler | workers |
+-------------+----------+-----------------------+---------+
| cloudpickle | 2.0.0 | 2.1.0 | None |
| distributed | 2022.8.0 | 2022.8.0+11.ge38d3a96 | None |
| lz4 | None | 4.0.0 | None |
+-------------+----------+-----------------------+---------+
warnings.warn(version_module.VersionMismatchWarning(msg[0]["warning"]))
When I run the second snippet, I get this error:
Output exceeds the size limit. Open the full output data in a text editor
---------------------------------------------------------------------------
KilledWorker Traceback (most recent call last)
/home/ryan/go/src/github.com/NVIDIA/aistore/sdk/python/dask/dask-aistore-demo.ipynb Cell 19' in <cell line: 5>()
1 new_homes_subset = df2[df2[' "Year"'] < 2000]
3 large_new_homes_subset = new_homes_subset[' "Year"'].sum()
----> 5 dask.compute(large_new_homes_subset)
File ~/.local/lib/python3.10/site-packages/dask/base.py:598, in compute(traverse, optimize_graph, scheduler, get, *args, **kwargs)
595 keys.append(x.__dask_keys__())
596 postcomputes.append(x.__dask_postcompute__())
--> 598 results = schedule(dsk, keys, **kwargs)
599 return repack([f(r, *a) for r, (f, a) in zip(results, postcomputes)])
File ~/.local/lib/python3.10/site-packages/distributed/client.py:3036, in Client.get(self, dsk, keys, workers, allow_other_workers, resources, sync, asynchronous, direct, retries, priority, fifo_timeout, actors, **kwargs)
3034 should_rejoin = False
3035 try:
-> 3036 results = self.gather(packed, asynchronous=asynchronous, direct=direct)
3037 finally:
3038 for f in futures.values():
File ~/.local/lib/python3.10/site-packages/distributed/client.py:2210, in Client.gather(self, futures, errors, direct, asynchronous)
2208 else:
2209 local_worker = None
-> 2210 return self.sync(
2211 self._gather,
...
-> 2073 raise exception.with_traceback(traceback)
2074 raise exc
2075 if errors == "skip":
KilledWorker: ("('series-sum-chunk-9caa6ad530ff0dadb4066f0ed23cb95c-5b1eb2849bba2950676223e06e5380f4', 0)", <WorkerState 'tcp://172.17.0.6:34509', name: 0, status: closed, memory: 0, processing: 1>)
2022-08-09 15:47:39,223 - distributed.deploy.adaptive - INFO - Retiring workers [0]
Lastly, here is my error log for the scheduler itself (cluster.get_logs()) for running the second snippet of code:
Output exceeds the size limit. Open the full output data in a text editor
---------------------------------------------------------------------------
RPCClosed Traceback (most recent call last)
File ~/.local/lib/python3.10/site-packages/distributed/core.py:1071, in rpc.__getattr__.<locals>.send_recv_from_rpc(**kwargs)
1070 try:
-> 1071 comm = await self.live_comm()
1072 comm.name = "rpc." + key
File ~/.local/lib/python3.10/site-packages/distributed/core.py:1019, in rpc.live_comm(self)
1018 if self.status == Status.closed:
-> 1019 raise RPCClosed("RPC Closed")
1020 to_clear = set()
RPCClosed: RPC Closed
The above exception was the direct cause of the following exception:
RPCClosed Traceback (most recent call last)
/home/ryan/go/src/github.com/NVIDIA/aistore/sdk/python/dask/dask-aistore-demo.ipynb Cell 21' in <cell line: 1>()
----> 1 cluster.get_logs()
File ~/.local/lib/python3.10/site-packages/distributed/deploy/cluster.py:336, in Cluster.get_logs(self, cluster, scheduler, workers)
317 def get_logs(self, cluster=True, scheduler=True, workers=True):
318 """Return logs for the cluster, scheduler and workers
319
320 Parameters
...
1082 ) from e
1084 self.comms[comm] = True # mark as open
1085 return result
RPCClosed: Exception while trying to call remote method 'get_logs' using comm None.
I would think that if the first snippet of code compiles fine, the second one should as well… I may be missing something important.
Thank you in advance!