How to handle exception from delayed task submitted with fire_and_forget?

Hi All,

Thanks for this great library, I use it as a workflow engine to accomplish my automation tasks. My automation is an usual DAG like thing chained by multiple small steps. A step might raise python exception to stop in the middle, I track the exception from dask.distributed.Delayed.compute() call and in that case mark the automation failed in the database, otherwize it will be successed.

The main problem I have in current way is the dependency on the active future behind the Delayed object, if the process holding dask client gets bounced, I will lose the track on the running task.

Therefore I started looking for the alternative approach:

  1. Submit delayed task using dask.distributed.fire_and_forget().
  2. Track the task step exception using a worker plugin implementation.

Does this sound like a proper way or do I miss another simpler way ?


Hi @dongxu, welcome to this forum, glad you like Dask!

About your problem, I’m afraid there is no easy solution. As you said, a task existence clearly depends on a correspondent future object, so if the Client process is not there anymore, the task will be cancelled by default. See Futures — Dask documentation

Dask will stop work that doesn’t have any active futures

The only other approach, as you said once again, is using fire_and_forget.

But then, if you need to keep track of the task/computation step, then Dask won’t provide anything. So as you said, you can rely on plugins, SchedulerPlugin would seem more natural to me for this kind of job, but WorkerPlugin will work too.

Others solutions I can think of:

  • Implement your own logic in your tasks, e.g. write a file or write to a database upon failure.
  • If you just want to keep track of errors, you might have a look to scheduler.erred_tasks attribute.