Raising Exceptions

I’m running delayed() on several fuctions and passing that to run() as a list of delayed functions.

When it fails I see in the logger:

distributed.worker - WARNING - Compute Failed
Function: execute_task
args: …
kwargs: {}
Exception: blah blah blah

However I actually want it to wait and raise an exception. It does not seem to ever hit the exception handler although the code looks like


  try:
        list_of_delayed = [c.delayed(xxxx)(),  c.delayed(yyyy)() ]
        c.run(list_of_delayed)
   except Exception as e:
       # never hits it 

How do I get it to wait and raise an exception? Dask client is set to async. The run() default kwargs are used and I see on_error: Literal['raise', 'return', 'ignore'] = 'raise' so I thought it would raise the exception from the caller.

1 Like

@methodical-brian Welcome!

We usually use client.run for debugging and not for actual computations. Also, I don’t think you can pass a list of delayed to client.run – it takes one regular python function as the first arg. I’d suggest using client.submit instead. :slight_smile:

import dask

@dask.delayed
def my_task1():
    raise RuntimeError

@dask.delayed
def my_task2():
    raise RuntimeError

from dask.distributed import Client, LocalCluster

cluster = await LocalCluster(asynchronous=True)
client = Client(cluster, asynchronous=True)

try:
    list_of_delayed = [my_task1,  my_task2]
    futures = c.compute(list_of_delayed)
    results = await client.gather(futures) # see await
except Exception as e:
    print("Error")