Hey everyone. I’m trying to use dask.delayed to construct a DAG graph from a list of data objects that have dependencies between themselves and can be in any order (for example, first item might depend on last item). My initial idea was to do 2 passes over the list creating Delayed objects in the first pass and then recreating / updating them with dependencies on the second pass. Test code and prints / logs in the . As you can see, task3 is getting executed twice, once without dependencies and once with. As far as I understand it’s due to how DAG nodes are created. Is there anything I could do here to avoid duplicate execution or to delete a duplicate (from my POV) node from the Dask graph?
Code:
from typing import List
from dask.distributed import Client as DaskClient
from dask import delayed
from distributed import LocalCluster
def execute_task(task_id: str, dependencies: List[str] = []):
if dependencies:
print(f"Task {task_id} executed with dependencies on {dependencies}")
else:
print(f"Task {task_id} executed without dependencies")
return task_id
def execute(tasks, client: DaskClient):
dask_tasks = {}
for task in tasks:
id = task["id"]
print(f"Creating {id} delayed in the first loop")
dask_tasks[task["id"]] = delayed(execute_task)(task["id"])
for task in tasks:
if task["dependsOn"]:
dependencies = [dask_tasks[dependsOn] for dependsOn in task["dependsOn"]]
id = task["id"]
print(f"Updating {id} delayed in the second loop with dependencies")
dask_tasks[task["id"]] = delayed(execute_task)(task["id"], dependencies)
final_tasks = list(dask_tasks.values())
print("Final tasks:")
print(final_tasks)
print(f"Calling compute after loop")
futures = client.compute(final_tasks)
print(f"Calling gather after loop")
results = client.gather(futures)
return results
# Hardcoded tasks data
tasks_data = [
{"id": "task0", "dependsOn": ["task3"]},
{"id": "task1", "dependsOn": []},
{"id": "task2", "dependsOn": ["task1"]},
{"id": "task3", "dependsOn": ["task1", "task2"]},
]
if __name__ == "__main__":
cluster = LocalCluster(processes=True)
client = DaskClient(cluster)
results = execute(tasks_data, client)
for result in results:
print(result)
Prints / logs:
Creating task0 delayed in the first loop
Creating task1 delayed in the first loop
Creating task2 delayed in the first loop
Creating task3 delayed in the first loop
Updating task0 delayed in the second loop with dependencies
Updating task2 delayed in the second loop with dependencies
Updating task3 delayed in the second loop with dependencies
Final tasks:
[Delayed('execute_task-e1f87e91-222b-4df6-9509-75d65de5a542'), Delayed('execute_task-022c83e0-bcbd-4fcb-861f-10b801cc395a'), Delayed('execute_task-efc34fa1-0239-422d-9f4d-6bf51b9ea4e3'), Delayed('execute_task-75f79595-58e6-4303-834a-5cfe2aceb30c')]
Calling compute after loop
Calling gather after loop
Task task1 executed without dependencies
Task task3 executed without dependencies
Task task2 executed with dependencies on ['task1']
Task task0 executed with dependencies on ['task3']
Task task3 executed with dependencies on ['task1', 'task2']
task0
task1
task2
task3