Overview
In short, I love Dask distributed, but I also love the decorators from Dask delayed. I have an approach that kind of works, but I’m receiving a PicklingError
in one edge case that I’d love insight on.
Setup
Without any decorators, let’s consider the following operations. Obviously, in reality, they would be high-compute tasks.
from dask.distributed import Client
client = Client()
def add(a, b):
return a + b
def make_more(val):
return [val] * 3
def add_distributed(vals, c):
return [add(val, c) for val in vals]
out1 = client.submit(add, 1, 2) # 3
out2 = client.submit(make_more, out1) # [3, 3, 3]
out3 = client.submit(add_distributed, out2, 3) # [6, 6, 6]
out3.result()
I understand that it is possible to be using .map()
here instead of .submit()
with some refactoring, but in the spirit of decorators, I want to keep the underlying code largely in tact.
Decorator Approach
Defining the Decorator
Let’s define a decorator @remote_execute
that is simply the following:
import functools
from dask.distributed import Client
client = Client()
def remote_execute(func):
@functools.wraps(func)
def wrapper(*args, **kwargs):
future = client.submit(func, *args, **kwargs)
return future
return wrapper
Presumably, in practice I should be passing in the client
somehow so I don’t end up instantiating it twice, but that seems solvable.
Trying it Out
Now, we rewrite the above routines:
from dask.distributed import Client
client = Client()
@remote_execute
def add(a, b):
return a + b
@remote_execute
def make_more(val):
return [val] * 3
@remote_execute
def add_distributed(vals, c):
return [add(val, c) for val in vals]
out1 = add(1, 2) # 3
out2 = make_more(out1) # [3, 3, 3]
out3 = add_distributed(out2, 3) # [6, 6, 6]
out3.result()
The Problem
Unfortunately, in running the above example, I get the following traceback:
2023-12-06 20:17:27,910 - distributed.protocol.pickle - ERROR - Failed to serialize <ToPickle: HighLevelGraph with 1 layers.
<dask.highlevelgraph.HighLevelGraph object at 0x1d3b9db68d0>
0. 2008808932416
>.
Traceback (most recent call last):
File "c:\Users\asros\miniconda\envs\quacc\Lib\site-packages\distributed\protocol\pickle.py", line 63, in dumps
result = pickle.dumps(x, **dump_kwargs)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
_pickle.PicklingError: Can't pickle <function add_distributed at 0x000001D3B8BB2DE0>: it's not the same object as __main__.add_distributed
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "c:\Users\asros\miniconda\envs\quacc\Lib\site-packages\distributed\protocol\pickle.py", line 68, in dumps
pickler.dump(x)
_pickle.PicklingError: Can't pickle <function add_distributed at 0x000001D3B8BB2DE0>: it's not the same object as __main__.add_distributed
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "c:\Users\asros\miniconda\envs\quacc\Lib\site-packages\distributed\protocol\pickle.py", line 81, in dumps
result = cloudpickle.dumps(x, **dump_kwargs)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "c:\Users\asros\miniconda\envs\quacc\Lib\site-packages\cloudpickle\cloudpickle.py", line 1479, in dumps
cp.dump(obj)
File "c:\Users\asros\miniconda\envs\quacc\Lib\site-packages\cloudpickle\cloudpickle.py", line 1245, in dump
return super().dump(obj)
^^^^^^^^^^^^^^^^^
TypeError: cannot pickle 'TaskStepMethWrapper' object
Any ideas on how to address this? Running a two-step workflow consisting of add
then make_more
works perfectly, but it’s that third task that really throws a wrench in things.
I’m open to other ideas too if there are better ways to integrate decorators into this approach!