Gracefully handle all-or-nothing delayed computations

Hi all. I’m looking for a way to handle the following situation

  • I’ve built up a collection of embarrassingly parallel computations using the delayed decorator
  • The computations are then executed using dask.compute
  • Currently if one of those calculations failed, all calculations would fail. In other words, it’s all or nothing
  • I want to avoid that situation and instead have all the calculations be independent from one another
import dask
from dask.distributed import Client

@dask.delayed
def weird_inc(x):
    if x == 5:
        raise ValueError('5 encountered!')
    return x + 1

delayed_calcs = [weird_inc(i) for i in range(10)]

Calling dask.compute(delayed_calcs) raises the exception and causes all other computations to fail.

/var/folders/cz/h62vtdfx6y13mdktwx54vx8c0000gp/T/ipykernel_23868/3809100349.py in weird_inc(x)
      5 def weird_inc(x):
      6     if x == 5:
----> 7         raise ValueError('5 encountered!')
      8     return x + 1
      9 

ValueError: 5 encountered!

If I’d used the futures interface to execute the computations asynchronously this issue would go away. However, in my actual use case I’ve used the delayed decorator extensively to code up the business logic so it’s non-trivial to switch to using the futures interface.

As a workaround, I’m trying to convert the delayed calculations to their futures counterpart. Matt suggested in this GitHub issue to use persist combined with wait to achieve this goal. However, based on some quick testing I’d done (see my comment in the issue), I wasn’t sure whether that’s still the canonical way to achieve this in newer versions of dask and distributed.

Based on the 3rd bullet in this doc, I opted for Client.compute instead.

client = Client()
futures = client.compute(delayed_calcs)

for future in futures:
    if future.status == 'error':
        print('Error encountered!')
    else:
        print(future.result())
1
2
3
4
5
Error encountered!
7
8
9
10

Is this a reasonable alternative to using persist and wait in combination? Specifically, is the if future.status == ... check an anti-pattern? Thanks!

1 Like

I think Client.compute + checking the status of the Futures is a totally reasonable thing to do here, as is checking future.status == ....

Another thing to consider is using as_completed instead of the for loop, so you see results as they happen (unless iteration order matters, of course!):

client = Client()
futures = client.compute(delayed_calcs)

for future, result in distributed.as_compted(futures, with_results=True, raise_errors=False):
    if future.status == 'error':
        print(f'Error: {result}!')
    else:
        print(result)

persist + wait can work too, as you pointed out in the issue:

persisted = dask.persist(*delayed_calcs)
done, not_done = wait(persisted)
assert not not_done
for f in done:
    if f.status == "error":
        print(f"Error: {f.exception()}")
    else:
        print(f.result())

I don’t think one is much better than the other. The nice thing about Client.compute + as_completed is that, if you don’t keep a reference to futures in your own code, I think you can get the cluster to release the results as soon as they’re done. With persist, all the data will stay in memory on the cluster until every task is done and you delete the Futures. If your results are large, this could use more memory than you want.

It’s unfortunate that there are so many different ways to do the same thing!

Thanks for the thorough reply, Gabe!