VersionError: How to solve?


I am currently getting a version error when trying to use compute() for a given DataFrame. I am using a KubeCluster deployment.

How can I reconcile the VersionError? Is there a way I can know which packages have the version mismatch?



Some additional information that may help! My workers are retiring (or gets hung in the process) when I try the example listed here: DataFrames: Groupby — Dask Examples documentation

I am running these lines:

from dask_kubernetes import KubeCluster, make_pod_spec
import dask.distributed
import dask.dataframe as dd
import dask

pod_spec = make_pod_spec(image='',
    memory_limit='2G', memory_request='2G',
    cpu_limit=1, cpu_request=1,
    env={'EXTRA_PIP_PACKAGES': 'fastparquet git+'})

cluster = KubeCluster(pod_spec)

df = dask.datasets.timeseries()

@rkoo19 Thanks for the code snippet, could you please share the complete error traceback?

Hi @pavithraes,

Thank you for the reply :slight_smile:

So I had a bit of success with it, but still having a few errors. I am running this example snippet of code:

import dask

df = dask.datasets.timeseries()

ddf = df[df['id'] > 1000]
dddf = ddf['id'].sum()


The above snippet compiles fine, but the following does not (using own storage system; access via direct HTTP address):

df2 = dd.read_csv("")

new_homes_subset = df2[df2[' "Year"'] < 2000]
large_new_homes_subset = new_homes_subset[' "Year"'].sum()


For extra information, I used the same KubeCluster specifications as described in my original post. When I run the KubeCluster, I am getting an error (still worked for the first code snippet):

Creating scheduler pod on cluster. This may take some time.
2022-08-09 15:45:33,758 - distributed.client - ERROR - Failed to reconnect to scheduler after 30.00 seconds, closing client
2022-08-09 15:46:07,218 - distributed.deploy.adaptive - INFO - Adaptive scaling started: minimum=0 maximum=inf
/home/ryan/.local/lib/python3.10/site-packages/distributed/ VersionMismatchWarning: Mismatched versions found

| Package     | client   | scheduler             | workers |
| cloudpickle | 2.0.0    | 2.1.0                 | None    |
| distributed | 2022.8.0 | 2022.8.0+11.ge38d3a96 | None    |
| lz4         | None     | 4.0.0                 | None    |

When I run the second snippet, I get this error:

Output exceeds the size limit. Open the full output data in a text editor
KilledWorker                              Traceback (most recent call last)
/home/ryan/go/src/ Cell 19' in <cell line: 5>()
      1 new_homes_subset = df2[df2[' "Year"'] < 2000]
      3 large_new_homes_subset = new_homes_subset[' "Year"'].sum()
----> 5 dask.compute(large_new_homes_subset)

File ~/.local/lib/python3.10/site-packages/dask/, in compute(traverse, optimize_graph, scheduler, get, *args, **kwargs)
    595     keys.append(x.__dask_keys__())
    596     postcomputes.append(x.__dask_postcompute__())
--> 598 results = schedule(dsk, keys, **kwargs)
    599 return repack([f(r, *a) for r, (f, a) in zip(results, postcomputes)])

File ~/.local/lib/python3.10/site-packages/distributed/, in Client.get(self, dsk, keys, workers, allow_other_workers, resources, sync, asynchronous, direct, retries, priority, fifo_timeout, actors, **kwargs)
   3034         should_rejoin = False
   3035 try:
-> 3036     results = self.gather(packed, asynchronous=asynchronous, direct=direct)
   3037 finally:
   3038     for f in futures.values():

File ~/.local/lib/python3.10/site-packages/distributed/, in Client.gather(self, futures, errors, direct, asynchronous)
   2208 else:
   2209     local_worker = None
-> 2210 return self.sync(
   2211     self._gather,
-> 2073         raise exception.with_traceback(traceback)
   2074     raise exc
   2075 if errors == "skip":

KilledWorker: ("('series-sum-chunk-9caa6ad530ff0dadb4066f0ed23cb95c-5b1eb2849bba2950676223e06e5380f4', 0)", <WorkerState 'tcp://', name: 0, status: closed, memory: 0, processing: 1>)
2022-08-09 15:47:39,223 - distributed.deploy.adaptive - INFO - Retiring workers [0]

Lastly, here is my error log for the scheduler itself (cluster.get_logs()) for running the second snippet of code:

Output exceeds the size limit. Open the full output data in a text editor
RPCClosed                                 Traceback (most recent call last)
File ~/.local/lib/python3.10/site-packages/distributed/, in rpc.__getattr__.<locals>.send_recv_from_rpc(**kwargs)
   1070 try:
-> 1071     comm = await self.live_comm()
   1072 = "rpc." + key

File ~/.local/lib/python3.10/site-packages/distributed/, in rpc.live_comm(self)
   1018 if self.status == Status.closed:
-> 1019     raise RPCClosed("RPC Closed")
   1020 to_clear = set()

RPCClosed: RPC Closed

The above exception was the direct cause of the following exception:

RPCClosed                                 Traceback (most recent call last)
/home/ryan/go/src/ Cell 21' in <cell line: 1>()
----> 1 cluster.get_logs()

File ~/.local/lib/python3.10/site-packages/distributed/deploy/, in Cluster.get_logs(self, cluster, scheduler, workers)
    317 def get_logs(self, cluster=True, scheduler=True, workers=True):
    318     """Return logs for the cluster, scheduler and workers
    320     Parameters
   1082         ) from e
   1084 self.comms[comm] = True  # mark as open
   1085 return result

RPCClosed: Exception while trying to call remote method 'get_logs' using comm None.

I would think that if the first snippet of code compiles fine, the second one should as well… I may be missing something important.

Thank you in advance!

(Some extra information)

I believe there isn’t any issue with the storage system. When I don’t compute anything and instead try a head() operation on df2 = dd.read_csv(""), it works fine. The .csv file itself is not large at all either.