"TypeError: cannot pickle '_asyncio.Task' object" only when executing function inside of a class

I have created a script that reads NetCDF4 files in parallel from a private s3 bucket. This script utilizes fsspec and xarray to index and open files while dask parallelizes the fetching, opening, and processing of the files. However, when I take this script and turn the core functionality into a class, I receive the following error:

Traceback (most recent call last):
  File "/home/mgray/miniforge3/envs/sshmapping/lib/python3.10/site-packages/distributed/protocol/serialize.py", line 366, in serialize
    header, frames = dumps(x, context=context) if wants_context else dumps(x)
  File "/home/mgray/miniforge3/envs/sshmapping/lib/python3.10/site-packages/distributed/protocol/serialize.py", line 78, in pickle_dumps
    frames[0] = pickle.dumps(
  File "/home/mgray/miniforge3/envs/sshmapping/lib/python3.10/site-packages/distributed/protocol/pickle.py", line 77, in dumps
    result = cloudpickle.dumps(x, **dump_kwargs)
  File "/home/mgray/miniforge3/envs/sshmapping/lib/python3.10/site-packages/cloudpickle/cloudpickle.py", line 1479, in dumps
    cp.dump(obj)
  File "/home/mgray/miniforge3/envs/sshmapping/lib/python3.10/site-packages/cloudpickle/cloudpickle.py", line 1245, in dump
    return super().dump(obj)
TypeError: cannot pickle '_asyncio.Task' object

which ultimately raises

TypeError: ('Could not serialize object of type HighLevelGraph', '<ToPickle: HighLevelGraph with 1 layers.

I have found a lot of very niche solutions to somewhat similar problems, but I haven’t gotten anything to relieve this error specifically. I have also tried other libraries for interacting with data in s3 buckets, but I either get similar errors or am just unable to integrate them properly with dask in general. Dask is of preference here because it would be nice to ultimately deploy something similar on a distributed remote cluster.

Here is a GitHub repo with the scripts I have tried (NOTE: some scripts with the naming convention “…external_custer…” require the cluster.py script to be ran).

It is clear that the parallelism works in general, but for whatever reason I can’t get it to work within a class. Is this expected behavior? Is there a work around for this? My ultimate intention is to have these classes be the basis of DataLoaders in TensorFlow to improve my model training throughput. I would be more than willing to share the end result on some public forum for others to use.

Hi @Fathom-mgray, welcome to Dask Discourse!

I take a look at your code, and I see some weird things: you are using a class, but inside this class, you are using external variables not declared as class variables, like so, files or client.

Anyway, I think the source of your proble is that Client object cannot be serialized, see python - Pickle error when submitting task using dask - Stack Overflow.

Hi @guillaumeeb, thanks for the response!

Regarding the link you posted, I don’t think I understand what is causing the problem. At what point is my Client object attempting to be serialized? In the responses, the original poster of the question said they solved the problem by changing the client to a local variable- as in they pass the client to the function or they use get_client() within the function? Neither of those methods worked for me, but I may have been doing something wrong. I’ve also tried making the Client an instance variable. How would you pass/initialize the client to/for a class? It seems I can’t have it be a global, instance, or local function variable in my expereince.

Hmm, I’m not sure actually. But you use the Client in some class method, I don’t know how pickle handles that. Maybe you should add serialization methods to your class?