Question: if I am mixing dask.delayed functions and using dask dataframes, are there any caveats to be aware of?

Will dask delayed impact how dask figures out the dataframe computation? I see some questions that seem to indicate dask delayed forces a compute of the dataframe if it’s being passed into it – is that correct? is there anything else to consider?

Hi @skrawcz,

Could you provide some example code of what you want to do? It’s fine to go from Delayed to DataFrame or the other way around for example, but it can be tricky to use Delayed on part of a DataFrame.

Sure. The following is a simplified sketch of the basic pattern:

@delayed
def load_df() -> DataFrame:
    return DataFrame.from_pandas(...)

@delayed
def transform1(df: Dataframe) -> Series:
   ...

@delayed
def transform2(df: Dataframe) -> float:
  return df.column.mean()

@delayed
def transform3(df: Dataframe, mean: float) -> Series
  return df.column - mean

@delayed
def new_df(t1: Series, t3: Series) -> Dataframe:
  # join the two
  return Dataframe.multi.concat([t1, t3], axis=1, join="inner")

...

_df = load_df()
_t1 = transform1(_df)
_t2 = transform2(_df)
_t3 = transform3(_df, _t2)
_result = new_df(_t1, _t2, _t3).compute()

When looking of this simplified code, I’m wondering if you need to use delayed at all.

Is there a reason why you don’t directly use DataFrame API, which is lazy and should be allright?

Something like:

df = DataFrame.from_pandas(...)
t1 = transform1(df) #Without delayed
t2 = df.column.mean()
t3 = df.column - t2
result = Dataframe.multi.concat([t1, t3], axis=1, join="inner").compute()

Yep, you’re right in this instance!

However, I am curious around caveats in mixing them – since we have a framework that can inject delayed – and so I’m looking to ensure I have the correct guidance on usage, and if there’s anything framework side we can do to check/ensure that performance issues aren’t a problem.

Okay, I think I have a better understanding about your original question now.

So in the end I just played a bit with your example, filling it with small code:

import dask
from dask import delayed
import dask.dataframe as dd
import pandas as pd

df = pd.DataFrame({
    'height':  [6.21, 5.12, 5.85, 5.78, 5.98],
    'weight': [150, 126, 133, 164, 203]
})

@delayed
def load_df():
    return dd.from_pandas(df, npartitions=2)

@delayed
def transform1(df):
    print(type(df))
    print(df)
    return df['height']

@delayed
def transform2(df):
    return df.weight.mean()

@delayed
def transform3(df, mean):
    return df.weight - mean

This is a tricky subject, and I thing you will need to be very careful of what you are doing.

If you correctly wrap all your dask dataframe objects inside a Delayed, then you should be OK.

When computing a Delayed, the Delayed in input is first computed in order to fed the next call. But in your case, you have a Delayed containing a lazy Dask DataFrame. So computing this Delayed will result in the Dask DataFrame structure, so another Dask graph.

What happens in your example is that your building a task graph from within Delayed calls. These delayed operation do almost nothing, and in the end, you’ll need to call compute twice:

_result = new_df(_t1, _t2, _t3).compute()
_result.compute()

But be careful, if at some point you are passing a Dask ataFrame object or other Dask collection in input of a Delayed function, then it will be computed first:

dd_not_delayed = dd.from_pandas(df, npartitions=2)
dd_not_delayed_t1 = transform1(dd_not_delayed)
dd_not_delayed_t1.compute()

will return a Pandas DataFrame, and the Dask DataFrame will be computed before the transform1 execution.

I hope this answers some of your questions.