Hi all. I’m looking for a way to handle the following situation
- I’ve built up a collection of embarrassingly parallel computations using the
delayed
decorator - The computations are then executed using
dask.compute
- Currently if one of those calculations failed, all calculations would fail. In other words, it’s all or nothing
- I want to avoid that situation and instead have all the calculations be independent from one another
import dask
from dask.distributed import Client
@dask.delayed
def weird_inc(x):
if x == 5:
raise ValueError('5 encountered!')
return x + 1
delayed_calcs = [weird_inc(i) for i in range(10)]
Calling dask.compute(delayed_calcs)
raises the exception and causes all other computations to fail.
/var/folders/cz/h62vtdfx6y13mdktwx54vx8c0000gp/T/ipykernel_23868/3809100349.py in weird_inc(x)
5 def weird_inc(x):
6 if x == 5:
----> 7 raise ValueError('5 encountered!')
8 return x + 1
9
ValueError: 5 encountered!
If I’d used the futures interface to execute the computations asynchronously this issue would go away. However, in my actual use case I’ve used the delayed
decorator extensively to code up the business logic so it’s non-trivial to switch to using the futures interface.
As a workaround, I’m trying to convert the delayed calculations to their futures counterpart. Matt suggested in this GitHub issue to use persist
combined with wait
to achieve this goal. However, based on some quick testing I’d done (see my comment in the issue), I wasn’t sure whether that’s still the canonical way to achieve this in newer versions of dask and distributed.
Based on the 3rd bullet in this doc, I opted for Client.compute
instead.
client = Client()
futures = client.compute(delayed_calcs)
for future in futures:
if future.status == 'error':
print('Error encountered!')
else:
print(future.result())
1
2
3
4
5
Error encountered!
7
8
9
10
Is this a reasonable alternative to using persist
and wait
in combination? Specifically, is the if future.status == ...
check an anti-pattern? Thanks!