Editing dask delayed DAG graph nodes after creation

Hey everyone. I’m trying to use dask.delayed to construct a DAG graph from a list of data objects that have dependencies between themselves and can be in any order (for example, first item might depend on last item). My initial idea was to do 2 passes over the list creating Delayed objects in the first pass and then recreating / updating them with dependencies on the second pass. Test code and prints / logs in the :thread:. As you can see, task3 is getting executed twice, once without dependencies and once with. As far as I understand it’s due to how DAG nodes are created. Is there anything I could do here to avoid duplicate execution or to delete a duplicate (from my POV) node from the Dask graph?

Code:

from typing import List
from dask.distributed import Client as DaskClient
from dask import delayed
from distributed import LocalCluster


def execute_task(task_id: str, dependencies: List[str] = []):
    if dependencies:
        print(f"Task {task_id} executed with dependencies on {dependencies}")
    else:
        print(f"Task {task_id} executed without dependencies")
    return task_id


def execute(tasks, client: DaskClient):
    dask_tasks = {}
    for task in tasks:
        id = task["id"]
        print(f"Creating {id} delayed in the first loop")
        dask_tasks[task["id"]] = delayed(execute_task)(task["id"])

    for task in tasks:
        if task["dependsOn"]:
            dependencies = [dask_tasks[dependsOn] for dependsOn in task["dependsOn"]]
            id = task["id"]
            print(f"Updating {id} delayed in the second loop with dependencies")
            dask_tasks[task["id"]] = delayed(execute_task)(task["id"], dependencies)

    final_tasks = list(dask_tasks.values())
    print("Final tasks:")
    print(final_tasks)
    print(f"Calling compute after loop")
    futures = client.compute(final_tasks)
    print(f"Calling gather after loop")
    results = client.gather(futures)
    return results


# Hardcoded tasks data
tasks_data = [
    {"id": "task0", "dependsOn": ["task3"]},
    {"id": "task1", "dependsOn": []},
    {"id": "task2", "dependsOn": ["task1"]},
    {"id": "task3", "dependsOn": ["task1", "task2"]},
]

if __name__ == "__main__":
    cluster = LocalCluster(processes=True)
    client = DaskClient(cluster)
    results = execute(tasks_data, client)

    for result in results:
        print(result)

Prints / logs:

Creating task0 delayed in the first loop
Creating task1 delayed in the first loop
Creating task2 delayed in the first loop
Creating task3 delayed in the first loop
Updating task0 delayed in the second loop with dependencies
Updating task2 delayed in the second loop with dependencies
Updating task3 delayed in the second loop with dependencies
Final tasks:
[Delayed('execute_task-e1f87e91-222b-4df6-9509-75d65de5a542'), Delayed('execute_task-022c83e0-bcbd-4fcb-861f-10b801cc395a'), Delayed('execute_task-efc34fa1-0239-422d-9f4d-6bf51b9ea4e3'), Delayed('execute_task-75f79595-58e6-4303-834a-5cfe2aceb30c')]
Calling compute after loop
Calling gather after loop
Task task1 executed without dependencies
Task task3 executed without dependencies
Task task2 executed with dependencies on ['task1']
Task task0 executed with dependencies on ['task3']
Task task3 executed with dependencies on ['task1', 'task2']
task0
task1
task2
task3

Hi @andrii-i, welcome to Dask community!

I found it weird, and was about to ask you to try to visualize the Dask graph, but I think I got the problem.

When you update task0, you update it with an old task3 which has not yet been updated. This task is thus kept in the graph, even if not in your list. You then override it, but the previous one is kept in memory to be used for task0.

Hi @andrii-i ,

The problem is that you are

  1. creating a delayed object for task A without dependencies
  2. creating another delayed object for task B that depends on A
  3. re-creating the delayed object to task A, this time with the correct dependencies - let’s call it A’

This won’t work, because task B embeds the whole graph of the original task A, so when you create task A’ it is treated as a completely different object with its own subgraph.

You should solve the DAG order yourself, exactly as if you were not using dask, by caching task outputs - with the big difference that the output of a delayed task is just metadata. This is a lot simpler to do than to describe:

from functools import lru_cache
import dask
from distributed import Client, LocalCluster

@dask.delayed(pure=True)
def execute_task(task_id: str, dependencies: list[str]):
    if dependencies:
        print(f"Task {task_id} executed with dependencies on {dependencies}")
    else:
        print(f"Task {task_id} executed without dependencies")
    return task_id


def execute(tasks):
    tasks = {task["id"]: task for task in tasks}

    @lru_cache
    def make_task(task_id):
        deps = tasks[task_id]["dependsOn"]
        return execute_task(task_id, [make_task(dep_id) for dep_id in deps])

    final_tasks = [make_task(task_id) for task_id in tasks]
    print("Final tasks:")
    print(final_tasks)
    print(f"Calling compute after loop")
    return dask.compute(*final_tasks)


# Hardcoded tasks data
tasks_data = [
    {"id": "task0", "dependsOn": ["task3"]},
    {"id": "task1", "dependsOn": []},
    {"id": "task2", "dependsOn": ["task1"]},
    {"id": "task3", "dependsOn": ["task1", "task2"]},
]

if __name__ == "__main__":
    with LocalCluster(processes=True) as cluster:
        with Client(cluster) as client:
            results = execute(tasks_data)
            for result in results:
                print(result)

Output:

Final tasks:
[Delayed('execute_task-36212c7784c41c0fb3c76fdd9445830b'), Delayed('execute_task-5f490fa0f969d97ca0992b125106a232'), Delayed('execute_task-2f227da2d5d25ae0624e8145a63badbd'), Delayed('execute_task-dd5fcd06c99938a9c258b2c80642303d')]
Calling compute after loop
Task task1 executed without dependencies
Task task2 executed with dependencies on ['task1']
Task task3 executed with dependencies on ['task1', 'task2']
Task task0 executed with dependencies on ['task3']
task0
task1
task2
task3

Above I used a local @lru_cache for convenience, but I might as well have written the caching system myself:

    cache = {}
    def make_task(task_id):
        try:
            return cache[task_id]
        except KeyError:
            deps = tasks[task_id]["dependsOn"]
            task = execute_task(task_id, [make_task(dep_id) for dep_id in deps])
            cache[task_id] = task
            return task

IMPORTANT: in the above examples, the cache is just a very minor performance optimization, but that’s because execute_task is pure - meaning if you call it twice with the same arguments it will generate the exact same DAG both times.
If it were impure, the cache would be crucial to avoid recomputing the same task over and over again.

To demonstrate, here’s what happens if I both remove @lru_cache and do not specify (pure=True) next to @delayed:

[Delayed('execute_task-f3f2d8fa-4d53-468f-bbfa-4cef9aff0648'), Delayed('execute_task-1635f9df-f73c-4658-9562-a5d0ded11f0b'), Delayed('execute_task-bbd69416-490f-4de5-b6d1-479d10cad1e4'), Delayed('execute_task-61805a71-f5bf-4ba3-a349-8f1c8be5f7e6')]
Calling compute after loop
Task task1 executed without dependencies
Task task1 executed without dependencies
Task task1 executed without dependencies
Task task1 executed without dependencies
Task task1 executed without dependencies
Task task1 executed without dependencies
Task task2 executed with dependencies on ['task1']
Task task2 executed with dependencies on ['task1']
Task task2 executed with dependencies on ['task1']
Task task3 executed with dependencies on ['task1', 'task2']
Task task3 executed with dependencies on ['task1', 'task2']
Task task0 executed with dependencies on ['task3']
task0
task1
task2
task3
1 Like

Thank you, much appreciate answer and guidance, it works great. This also taught me something new about Python, thank you.

1 Like