MCVE:
from concurrent.futures import ProcessPoolExecutor
from functools import partial
import dask
from dask.distributed import Client
from distributed.diagnostics.plugin import WorkerPlugin
def my_process(data, fixed_arg):
pass
class AddProcessPool(WorkerPlugin):
def setup(self, worker):
executor = ProcessPoolExecutor(max_workers=worker.nthreads)
worker.executors["processes"] = executor
def main():
partial_process = partial(my_process, fixed_arg="fixed_data")
with Client(processes=False) as client:
client.register_worker_plugin(AddProcessPool())
with dask.annotate(executor="processes"):
client.gather(client.map(partial_process, list(range(1000))))
if __name__ == "__main__":
main()
python path/to/file
or pip install . && python -m mod.ule
Partial error (repeated):
/home/ubuntu/mambaforge/envs/pygnssr2-dev/lib/python3.10/site-packages/distributed/worker_state_machine.py:3357: FutureWarning: The `Worker.nthreads` attribute has been moved to `Worker.state.nthreads`
warnings.warn(
2022-07-21 08:37:14,053 - distributed.worker - ERROR - Exception during execution of task my_process-0514e9dc6d6631bf45d18e50c3312d9a.
concurrent.futures.process._RemoteTraceback:
"""
Traceback (most recent call last):
File "/home/ubuntu/mambaforge/envs/pygnssr2-dev/lib/python3.10/multiprocessing/queues.py", line 244, in _feed
obj = _ForkingPickler.dumps(obj)
File "/home/ubuntu/mambaforge/envs/pygnssr2-dev/lib/python3.10/multiprocessing/reduction.py", line 51, in dumps
cls(buf, protocol).dump(obj)
_pickle.PicklingError: Can't pickle <function my_process at 0x7f83f862ab00>: it's not the same object as __main__.my_process
"""
The above exception was the direct cause of the following exception:
Traceback (most recent call last):
File "/home/ubuntu/mambaforge/envs/pygnssr2-dev/lib/python3.10/site-packages/distributed/worker.py", line 2208, in execute
result = await self.loop.run_in_executor(
File "/home/ubuntu/mambaforge/envs/pygnssr2-dev/lib/python3.10/multiprocessing/queues.py", line 244, in _feed
obj = _ForkingPickler.dumps(obj)
File "/home/ubuntu/mambaforge/envs/pygnssr2-dev/lib/python3.10/multiprocessing/reduction.py", line 51, in dumps
cls(buf, protocol).dump(obj)
_pickle.PicklingError: Can't pickle <function my_process at 0x7f83f862ab00>: it's not the same object as __main__.my_process
2022-07-21 08:37:14,062 - distributed.worker - ERROR - Exception during execution of task my_process-04f1de90691642d43d79802ef4ff84d0.
concurrent.futures.process._RemoteTraceback:
"""
Traceback (most recent call last):
File "/home/ubuntu/mambaforge/envs/pygnssr2-dev/lib/python3.10/multiprocessing/queues.py", line 244, in _feed
obj = _ForkingPickler.dumps(obj)
File "/home/ubuntu/mambaforge/envs/pygnssr2-dev/lib/python3.10/multiprocessing/reduction.py", line 51, in dumps
cls(buf, protocol).dump(obj)
_pickle.PicklingError: Can't pickle <function my_process at 0x7f83f862ab00>: it's not the same object as __main__.my_process
"""
The above exception was the direct cause of the following exception:
Traceback (most recent call last):
File "/home/ubuntu/mambaforge/envs/pygnssr2-dev/lib/python3.10/site-packages/distributed/worker.py", line 2208, in execute
result = await self.loop.run_in_executor(
File "/home/ubuntu/mambaforge/envs/pygnssr2-dev/lib/python3.10/multiprocessing/queues.py", line 244, in _feed
obj = _ForkingPickler.dumps(obj)
File "/home/ubuntu/mambaforge/envs/pygnssr2-dev/lib/python3.10/multiprocessing/reduction.py", line 51, in dumps
cls(buf, protocol).dump(obj)
_pickle.PicklingError: Can't pickle <function my_process at 0x7f83f862ab00>: it's not the same object as __main__.my_process
2022-07-21 08:37:14,063 - distributed.worker - ERROR - Exception during execution of task my_process-ec0c8bbbd0b9a7d11117f2690ad9a733.
concurrent.futures.process._RemoteTraceback:
"""
Traceback (most recent call last):
File "/home/ubuntu/mambaforge/envs/pygnssr2-dev/lib/python3.10/multiprocessing/queues.py", line 244, in _feed
obj = _ForkingPickler.dumps(obj)
File "/home/ubuntu/mambaforge/envs/pygnssr2-dev/lib/python3.10/multiprocessing/reduction.py", line 51, in dumps
cls(buf, protocol).dump(obj)
_pickle.PicklingError: Can't pickle <function my_process at 0x7f83f862ab00>: it's not the same object as __main__.my_process
"""
...