Problems with object pickle

Hi!

I am a beginner in parallel computing, so I am sorry if the answer to my question is obvious.
I am trying to distribute a function of a pyGIMLi object (as I understand, although their software is in python it is based on C++ core libraries) on multiple CPUs and I get an error that the function cannot be pickled.

This is the code I use to distribute:

    def nonlinear(m, tt, ndata):
        G = np.zeros((ndata, tt.inv.parameterCount))
        tt.Velocity = pg.Vector(np.float64(1./m.flatten()))
        tt.fop.createJacobian(1./tt.Velocity)
        G = pg.utils.sparseMatrix2Dense(tt.fop.jacobian())
        return G

    cluster = LocalCluster(n_workers=4, dashboard_address=':8787')
    client = Client(cluster)
    print(client)
    
    tt = setup.tt
    for i, par in enumerate(m):
        f = client.submit(nonlinear,par , tt, ndata , key=f"{nonlinear.__name__}{i}")

    cluster.close()
    client.close()

and this is the error I get:

22/02/22 - 13:32:32 - pyGIMLi - INFO - Creating refined mesh (secnodes: 2) to solve forward task.
Setting up nonlinear forward solver
Mesh: Nodes: 8580 Cells: 8385 Boundaries: 16964
16.666668
22/02/22 - 13:32:39 - pyGIMLi - INFO - Found 1 regions.
22/02/22 - 13:32:39 - pyGIMLi - INFO - Creating forward mesh from region infos.
Data: Sensors: 50 data: 625, nonzero entries: ['g', 's', 't', 'valid']
22/02/22 - 13:32:39 - pyGIMLi - INFO - Creating refined mesh (secnodes: 2) to solve forward task.
6.029357194900513
<Client: 'tcp://127.0.0.1:39387' processes=4 threads=12, memory=31.15 GiB>
Traceback (most recent call last):

  File "/home/anaconda3/envs/pg/lib/python3.9/site-packages/distributed/protocol/pickle.py", line 49, in dumps
    result = pickle.dumps(x, **dump_kwargs)

RuntimeError: Pickling of "pgcore._pygimli_.RInversion" instances is not enabled (http://www.boost.org/libs/python/doc/v2/pickle.html)


During handling of the above exception, another exception occurred:

Traceback (most recent call last):

  File "/home/Desktop/parallel_jacobian.py", line 165, in <mod C++ core libraryule>
    f = client.submit(nonlinear,par, tt, ndata , key=f"{nonlinear.__name__}{i}")

  File "/home/anaconda3/envs/pg/lib/python3.9/site-packages/distributed/client.py", line 1742, in submit
    futures = self._graph_to_futures(

  File "/home/anaconda3/envs/pg/lib/python3.9/site-packages/distributed/client.py", line 2882, in _graph_to_futures
    dsk = dsk.__dask_distributed_pack__(self, keyset, annotations)

  File "/home/anaconda3/envs/pg/lib/python3.9/site-packages/dask/highlevelgraph.py", line 1054, in __dask_distributed_pack__
    "state": layer.__dask_distributed_pack__(

  File "/home/anaconda3/envs/pg/lib/python3.9/site-packages/dask/highlevelgraph.py", line 425, in __dask_distributed_pack__
    dsk = toolz.valmap(dumps_task, dsk)

  File "/home/anaconda3/envs/pg/lib/python3.9/site-packages/toolz/dicttoolz.py", line 83, in valmap
    rv.update(zip(d.keys(), map(func, d.values())))

  File "/home/anaconda3/envs/pg/lib/python3.9/site-packages/distributed/worker.py", line 4480, in dumps_task
    return {"function": dumps_function(task[0]), "args": warn_dumps(task[1:])}

  File "/home/anaconda3/envs/pg/lib/python3.9/site-packages/distributed/worker.py", line 4489, in warn_dumps
    b = dumps(obj, protocol=4)

  File "/home/anaconda3/envs/pg/lib/python3.9/site-packages/distributed/protocol/pickle.py", line 60, in dumps
    result = cloudpickle.dumps(x, **dump_kwargs)

  File "/home/anaconda3/envs/pg/lib/python3.9/site-packages/cloudpickle/cloudpickle_fast.py", line 73, in dumps
    cp.dump(obj)

  File "/home/anaconda3/envs/pg/lib/python3.9/site-packages/cloudpickle/cloudpickle_fast.py", line 602, in dump
    return Pickler.dump(self, obj)

RuntimeError: Pickling of "pgcore._pygimli_.RInversion" instances is not enabled (http://www.boost.org/libs/python/doc/v2/pickle.html)

Is there a different way to serialize this object without pickle? I tried distributing this function with other parallel computing libraries (joblib, ipyparallel) and I get the same error. I though that maybe Dask has a solution for that.

Thanks in advance for your answer!

Hi @Lori and welcome to discourse! I’m not too familiar with the pyGIMLi library, but I wonder if trying Dask’s other serialization options might help? We’d also be happy to try out some of these options, or another workaround, if you can provide a reproducible code snippet (the one you shared is helpful as a starting point).

2 Likes

Hi @scharlottej13, thank you for the quick reply!
I don’t know if I done it correctly but I tried specifying ‘dask’ or ‘msgpack’ under ‘serializers’ and ‘deserializers’ in Client() but it always uses cloudpickle for that function. Maybe I am doing it wrong?

My code is actually very similar to the one in VAE_SGD, but I am trying to calculate the function tt.fop.createJacobian(1./tt.velocity) for several velocity models at the same time.

No problem @Lori! From looking at this issue (Pickling issue pygimli + BERT) in the pyGIMLi repo, it seems pickling of pyGIMLi objects is not yet supported. I’m not sure if Dask’s other serializer options will support a pyGIMLi object, could you share a minimal reproducer for how you’re specifying the serializers?

2 Likes

I didn’t specify any custom serializers if that is what you ask. I simply tried
client = Client(cluster, serializers=['dask'], deserializers=['dask'])
or
client = Client(cluster, serializers=['dask', 'msgpack'], deserializers=['dask', 'msgpack'])

Indeed, I saw the issue you linked to before and they suggested dask there: Pickling issue Pygimli + BERT #270

Hi @Lori, sorry for the back and forth on this. Upon closer reading of the Serialization - Dask.distributed docs, currently functions are only serialized by pickle and cloud pickle:

Computational tasks like f(x) that are defined and serialized on client processes and deserialized and run on worker processes. These are serialized using a fixed scheme decided on by those libraries. Today this is a combination of pickle and cloudpickle.

Here’s a small python snippet to show this:

from dask.distributed.protocol import serialize
import numpy as np

def my_func(x):
    return len(x)

data = np.arange(20)

# produces an error, "Could not serialize object of type function."
# serialize(my_func, serializers=['dask', 'msgpack'])

s_func = serialize(my_func)
print(f"serializer type: {s_func[0]['serializer']}")
s_data = serialize(data)
print(f"serializer type: {s_data[0]['serializer']}")
2 Likes