Passing `@delayed` functions to other `@delayed` functions

Question

What is the recommended way to deal with passing @delayed-decorated functions to other @delayed-decorated functions while respecting concurrency and without rewriting the underlying Python functions?

Consider the following toy example:

import time
import dask
from dask.distributed import Client

@dask.delayed
def job():
    print('Sleeping')
    time.sleep(5)
    return True

@dask.delayed
def get_n():
    return 5

@dask.delayed
def subflow(op, n):
    results = []
    for _ in range(n):
        results.append(op())
    return results

def workflow():
    n = get_n()
    return subflow(job, n)

Calling

client = Client()
dask.compute(workflow())

results in no concurrency, which is not desirable. This can be seen by the five sequential “Sleeping” statements.

What is the ideal approach in this scenario?

Undesired Solution

Naively, one might decide to strip off the @delayed decorator on subflow, However, then the for loop can’t proceed since the value of n is not known a priori. To resolve this, you can also call .compute() on n = get_n() to ensure n is known, but I really strive not to directly insert blocking .compute() logic in intermediate steps of a workflow.

Context

It may not be initially obvious, but the reason the title of this question is about passing @delayed functions to other @delayed functions is that the following does achieve concurrency without any intermediate blocking calls.

@dask.delayed
def job():
    print('Sleeping')
    time.sleep(5)
    return True

@dask.delayed
def get_n():
    return 5

def subflow(n):
    results = []
    for _ in range(n):
        results.append(job())
    return results

def workflow():
    n = get_n().compute()
    return subflow(n)

Hi @arosen93,

First of all, in most cases: you don’t want to do this. Your real use case is probably much complex than this example, but mixing control flow, building a tasks graph, with condition on future computation is tricky.

I had trouble having something working, and in the end I just removed the anotations for better control of what I was doing. I ended up with:

def job():
    print('Sleeping')
    time.sleep(5)
    return True

def get_n():
    return 5

def subflow(op, n):
    results = []
    for _ in range(n):
        results.append(dask.delayed(op)())
    return dask.compute(*results)

def workflow():
    n = dask.delayed(get_n)()
    return dask.delayed(subflow)(job, n)

As you can see, I introduce a dask.compute call inside the subflow function. There are other considerations that should be taken care of like using secede and rejoin, see Launch Tasks from Tasks — Dask.distributed 2023.12.1 documentation.

You could also do something like:

def job():
    print('Sleeping')
    time.sleep(5)
    return True

def get_n():
    return 5

def subflow(op, n):
    results = []
    for _ in range(n):
        results.append(dask.delayed(op)())
    return results

def workflow():
    n = dask.delayed(get_n)()
    return dask.delayed(subflow)(job, n)

dask.compute(workflow().compute())

@guillaumeeb: Thanks for the reply!

This is one of those scenarios where this logic is intentional based on the structure of my library. I am shipping a library of pre-defined compute tasks and workflows, and the user can swap out individual tasks in the workflow via an optional keyword argument. That ultimately leads to this @delayed passing to @delayed scenario. Actually, if you are curious, the true motivation is that this package I’m building is a library of workflows meant to be interoperable with several different workflow engines, and everything “just works” except for this one edge case with Dask.

I had independently concluded that I need to call .compute() on the result of subflow. I have reframed your answer to more closely match my original post, and I have a question/potential issue to raise.

def job():
    print('Sleeping')
    time.sleep(5)
    return True

@dask.delayed
def get_n():
    return 5

@dask.delayed
def subflow(op, n):
    results = []
    for _ in range(n):
        results.append(dask.delayed(op)())
    return results

def workflow():
    n = get_n()
    return subflow(job, n).compute()

This achieves concurrency. However, if the @dask.delayed decorator is added to job (and removed from the dask.delayed(op) call), concurrency is broken. Is this the intended behavior? It seems like this might be an undesired issue that crops up when the @delayed function is used as an argument in a @delayed decorated function. I opened an issue here.

I have to admit I really don’t know. This might well be some bug.

The “delayed” part is stripped off the function when you pass a delayed function to another delayed function. So it actually converts into passing normal function to delayed function.
if you do “print(op)” in in your delayed subflow, it will print that it is a function object and not a delayed object. Same happens with get_n, in subflow that also becomes an int.
I hope that clarifies it.

Yup, that’s indeed the case. Not sure that’s ideal behavior though.