Pickling problem in simplest example

MCVE:

from concurrent.futures import ProcessPoolExecutor
from functools import partial

import dask
from dask.distributed import Client
from distributed.diagnostics.plugin import WorkerPlugin

def my_process(data, fixed_arg):
    pass


class AddProcessPool(WorkerPlugin):
    def setup(self, worker):
        executor = ProcessPoolExecutor(max_workers=worker.nthreads)
        worker.executors["processes"] = executor

def main():
    partial_process = partial(my_process, fixed_arg="fixed_data")
    with Client(processes=False) as client:
        client.register_worker_plugin(AddProcessPool())
        with dask.annotate(executor="processes"):
            client.gather(client.map(partial_process, list(range(1000))))

if __name__ == "__main__":
    main()

python path/to/file or pip install . && python -m mod.ule

Partial error (repeated):

/home/ubuntu/mambaforge/envs/pygnssr2-dev/lib/python3.10/site-packages/distributed/worker_state_machine.py:3357: FutureWarning: The `Worker.nthreads` attribute has been moved to `Worker.state.nthreads`
  warnings.warn(
2022-07-21 08:37:14,053 - distributed.worker - ERROR - Exception during execution of task my_process-0514e9dc6d6631bf45d18e50c3312d9a.
concurrent.futures.process._RemoteTraceback: 
"""
Traceback (most recent call last):
  File "/home/ubuntu/mambaforge/envs/pygnssr2-dev/lib/python3.10/multiprocessing/queues.py", line 244, in _feed
    obj = _ForkingPickler.dumps(obj)
  File "/home/ubuntu/mambaforge/envs/pygnssr2-dev/lib/python3.10/multiprocessing/reduction.py", line 51, in dumps
    cls(buf, protocol).dump(obj)
_pickle.PicklingError: Can't pickle <function my_process at 0x7f83f862ab00>: it's not the same object as __main__.my_process
"""

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "/home/ubuntu/mambaforge/envs/pygnssr2-dev/lib/python3.10/site-packages/distributed/worker.py", line 2208, in execute
    result = await self.loop.run_in_executor(
  File "/home/ubuntu/mambaforge/envs/pygnssr2-dev/lib/python3.10/multiprocessing/queues.py", line 244, in _feed
    obj = _ForkingPickler.dumps(obj)
  File "/home/ubuntu/mambaforge/envs/pygnssr2-dev/lib/python3.10/multiprocessing/reduction.py", line 51, in dumps
    cls(buf, protocol).dump(obj)
_pickle.PicklingError: Can't pickle <function my_process at 0x7f83f862ab00>: it's not the same object as __main__.my_process
2022-07-21 08:37:14,062 - distributed.worker - ERROR - Exception during execution of task my_process-04f1de90691642d43d79802ef4ff84d0.
concurrent.futures.process._RemoteTraceback: 
"""
Traceback (most recent call last):
  File "/home/ubuntu/mambaforge/envs/pygnssr2-dev/lib/python3.10/multiprocessing/queues.py", line 244, in _feed
    obj = _ForkingPickler.dumps(obj)
  File "/home/ubuntu/mambaforge/envs/pygnssr2-dev/lib/python3.10/multiprocessing/reduction.py", line 51, in dumps
    cls(buf, protocol).dump(obj)
_pickle.PicklingError: Can't pickle <function my_process at 0x7f83f862ab00>: it's not the same object as __main__.my_process
"""

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "/home/ubuntu/mambaforge/envs/pygnssr2-dev/lib/python3.10/site-packages/distributed/worker.py", line 2208, in execute
    result = await self.loop.run_in_executor(
  File "/home/ubuntu/mambaforge/envs/pygnssr2-dev/lib/python3.10/multiprocessing/queues.py", line 244, in _feed
    obj = _ForkingPickler.dumps(obj)
  File "/home/ubuntu/mambaforge/envs/pygnssr2-dev/lib/python3.10/multiprocessing/reduction.py", line 51, in dumps
    cls(buf, protocol).dump(obj)
_pickle.PicklingError: Can't pickle <function my_process at 0x7f83f862ab00>: it's not the same object as __main__.my_process
2022-07-21 08:37:14,063 - distributed.worker - ERROR - Exception during execution of task my_process-ec0c8bbbd0b9a7d11117f2690ad9a733.
concurrent.futures.process._RemoteTraceback: 
"""
Traceback (most recent call last):
  File "/home/ubuntu/mambaforge/envs/pygnssr2-dev/lib/python3.10/multiprocessing/queues.py", line 244, in _feed
    obj = _ForkingPickler.dumps(obj)
  File "/home/ubuntu/mambaforge/envs/pygnssr2-dev/lib/python3.10/multiprocessing/reduction.py", line 51, in dumps
    cls(buf, protocol).dump(obj)
_pickle.PicklingError: Can't pickle <function my_process at 0x7f83f862ab00>: it's not the same object as __main__.my_process
"""

...

@hameer-spire Thanks for this question. @ncclementi and I are able to reproduce this, and we’ve opened an issue here: PicklingError when using custom ProcessPoolExecutor · Issue #6803 · dask/distributed · GitHub

1 Like