Dask.delayed and custom workflows

I am a newbee to dask and I am evaluating whether dask will be the best framework to fit my use case. Let me describe what I am trying to achieve:

  • I have an existing python application where I am executing a DAG of nodes, currently the execution is done within a single process but I am finding this quite slow and I want to execute this over a cluster of machines.

  • Each node in the DAG is a python function that takes some inputs and creates an output. inputs/outputs are python data class objects.

  • Also in a few of the python functions (node) there is a need to further parallelise certain operations. As a concrete example, I want to replace a pandas dataframe with the dask equivalent so that I can speed up the operation even further.

I would like to understand whether I can use dask to convert my DAG nodes to dask.delayed tasks to execute nodes in a distributed manner and parallelise their execution while also using dask containers like dataframes to further parallelize certain operations in some of the DAG nodes. From what I read online this does not seem to be supported. If not, could you suggest any alternatives? Thank you!

Hi @bb227, welcome to Dask community!

So you describe two levels of parallelism, which is always tricky to manage using the same tool.

If a node is executed a Dask Delayed, then it is inside a task, and if you want to use Dask Dataframe in that node, you’ll end up launching Task from tasks. This is feasible, but you’ll need to be careful.

A really simple example that works:

import dask
from dask import delayed
import dask.dataframe as dd
import pandas as pd
from dask.distributed import worker_client
from dask.distributed import Client

def node():
    import time
    time.sleep(0.5)
    with worker_client() as client:
        df = pd.DataFrame({
            'height':  [6.21, 5.12, 5.85, 5.78, 5.98],
            'weight': [150, 126, 133, 164, 203]
        })
        return dd.from_pandas(df, npartitions=2).compute()

client = Client()
client

graph = delayed(node)()
dask.compute(graph)

Thanks a lot @guillaumeeb, this is exactly what I was looking for!