VersionError: How to solve?

Hi,

I am currently getting a version error when trying to use compute() for a given DataFrame. I am using a KubeCluster deployment.

How can I reconcile the VersionError? Is there a way I can know which packages have the version mismatch?

Thanks,

Ryan

Some additional information that may help! My workers are retiring (or gets hung in the process) when I try the example listed here: DataFrames: Groupby — Dask Examples documentation

I am running these lines:

from dask_kubernetes import KubeCluster, make_pod_spec
import dask.distributed
import dask.dataframe as dd
import dask

pod_spec = make_pod_spec(image='ghcr.io/dask/dask:latest',
    memory_limit='2G', memory_request='2G',
    cpu_limit=1, cpu_request=1,
    env={'EXTRA_PIP_PACKAGES': 'fastparquet git+https://github.com/dask/distributed'})

cluster = KubeCluster(pod_spec)
cluster.adapt()
dask.distributed.Client(cluster)

df = dask.datasets.timeseries()
df.groupby('name').x.mean().compute()

@rkoo19 Thanks for the code snippet, could you please share the complete error traceback?

Hi @pavithraes,

Thank you for the reply :slight_smile:

So I had a bit of success with it, but still having a few errors. I am running this example snippet of code:

import dask

df = dask.datasets.timeseries()

ddf = df[df['id'] > 1000]
dddf = ddf['id'].sum()

dask.compute(dddf)

The above snippet compiles fine, but the following does not (using own storage system; access via direct HTTP address):

df2 = dd.read_csv("http://192.168.49.2:8080/v1/objects/dask-demo-bucket/zillow.csv")

new_homes_subset = df2[df2[' "Year"'] < 2000]
large_new_homes_subset = new_homes_subset[' "Year"'].sum()

dask.compute(large_new_homes_subset)

For extra information, I used the same KubeCluster specifications as described in my original post. When I run the KubeCluster, I am getting an error (still worked for the first code snippet):

Creating scheduler pod on cluster. This may take some time.
2022-08-09 15:45:33,758 - distributed.client - ERROR - Failed to reconnect to scheduler after 30.00 seconds, closing client
2022-08-09 15:46:07,218 - distributed.deploy.adaptive - INFO - Adaptive scaling started: minimum=0 maximum=inf
/home/ryan/.local/lib/python3.10/site-packages/distributed/client.py:1309: VersionMismatchWarning: Mismatched versions found

+-------------+----------+-----------------------+---------+
| Package     | client   | scheduler             | workers |
+-------------+----------+-----------------------+---------+
| cloudpickle | 2.0.0    | 2.1.0                 | None    |
| distributed | 2022.8.0 | 2022.8.0+11.ge38d3a96 | None    |
| lz4         | None     | 4.0.0                 | None    |
+-------------+----------+-----------------------+---------+
  warnings.warn(version_module.VersionMismatchWarning(msg[0]["warning"]))

When I run the second snippet, I get this error:

Output exceeds the size limit. Open the full output data in a text editor
---------------------------------------------------------------------------
KilledWorker                              Traceback (most recent call last)
/home/ryan/go/src/github.com/NVIDIA/aistore/sdk/python/dask/dask-aistore-demo.ipynb Cell 19' in <cell line: 5>()
      1 new_homes_subset = df2[df2[' "Year"'] < 2000]
      3 large_new_homes_subset = new_homes_subset[' "Year"'].sum()
----> 5 dask.compute(large_new_homes_subset)

File ~/.local/lib/python3.10/site-packages/dask/base.py:598, in compute(traverse, optimize_graph, scheduler, get, *args, **kwargs)
    595     keys.append(x.__dask_keys__())
    596     postcomputes.append(x.__dask_postcompute__())
--> 598 results = schedule(dsk, keys, **kwargs)
    599 return repack([f(r, *a) for r, (f, a) in zip(results, postcomputes)])

File ~/.local/lib/python3.10/site-packages/distributed/client.py:3036, in Client.get(self, dsk, keys, workers, allow_other_workers, resources, sync, asynchronous, direct, retries, priority, fifo_timeout, actors, **kwargs)
   3034         should_rejoin = False
   3035 try:
-> 3036     results = self.gather(packed, asynchronous=asynchronous, direct=direct)
   3037 finally:
   3038     for f in futures.values():

File ~/.local/lib/python3.10/site-packages/distributed/client.py:2210, in Client.gather(self, futures, errors, direct, asynchronous)
   2208 else:
   2209     local_worker = None
-> 2210 return self.sync(
   2211     self._gather,
...
-> 2073         raise exception.with_traceback(traceback)
   2074     raise exc
   2075 if errors == "skip":

KilledWorker: ("('series-sum-chunk-9caa6ad530ff0dadb4066f0ed23cb95c-5b1eb2849bba2950676223e06e5380f4', 0)", <WorkerState 'tcp://172.17.0.6:34509', name: 0, status: closed, memory: 0, processing: 1>)
2022-08-09 15:47:39,223 - distributed.deploy.adaptive - INFO - Retiring workers [0]

Lastly, here is my error log for the scheduler itself (cluster.get_logs()) for running the second snippet of code:

Output exceeds the size limit. Open the full output data in a text editor
---------------------------------------------------------------------------
RPCClosed                                 Traceback (most recent call last)
File ~/.local/lib/python3.10/site-packages/distributed/core.py:1071, in rpc.__getattr__.<locals>.send_recv_from_rpc(**kwargs)
   1070 try:
-> 1071     comm = await self.live_comm()
   1072     comm.name = "rpc." + key

File ~/.local/lib/python3.10/site-packages/distributed/core.py:1019, in rpc.live_comm(self)
   1018 if self.status == Status.closed:
-> 1019     raise RPCClosed("RPC Closed")
   1020 to_clear = set()

RPCClosed: RPC Closed

The above exception was the direct cause of the following exception:

RPCClosed                                 Traceback (most recent call last)
/home/ryan/go/src/github.com/NVIDIA/aistore/sdk/python/dask/dask-aistore-demo.ipynb Cell 21' in <cell line: 1>()
----> 1 cluster.get_logs()

File ~/.local/lib/python3.10/site-packages/distributed/deploy/cluster.py:336, in Cluster.get_logs(self, cluster, scheduler, workers)
    317 def get_logs(self, cluster=True, scheduler=True, workers=True):
    318     """Return logs for the cluster, scheduler and workers
    319 
    320     Parameters
...
   1082         ) from e
   1084 self.comms[comm] = True  # mark as open
   1085 return result

RPCClosed: Exception while trying to call remote method 'get_logs' using comm None.

I would think that if the first snippet of code compiles fine, the second one should as well… I may be missing something important.

Thank you in advance!

(Some extra information)

I believe there isn’t any issue with the storage system. When I don’t compute anything and instead try a head() operation on df2 = dd.read_csv("http://192.168.49.2:8080/v1/objects/dask-demo-bucket/zillow.csv"), it works fine. The .csv file itself is not large at all either.