Duplicated task stream in Dask dashboard

Hi,

First, thank you for creating (and supporting) Dask, it is amazing.

I have a brief Python code publicly available here (dask-duckdb-dbeaver/out_of_memory_ETL.ipynb at main · akbaritabar/dask-duckdb-dbeaver · GitHub). It includes 2 custom functions that use Dask delayed.

When I run the code, looking at the Dask dashboard, and in the task stream to be specific, it seems like the tasks are being done 2 times. Once they all go through and then they pass the output to my call to exporting into parquet files. I was puzzled what could be the reason as my code is pretty simple and I don’t have replicated parts in the script.

Here is an example photo:

I hope the above description is clear enough so that you can share your opinion (on the root cause of it) with me. Thanks.

Hi @akbaritabar and welcome to discourse! Thanks for this question and for the easily reproducible code.

@pavithraes and I think the duplication you’re seeing is from the from_delayed call, which will trigger a compute if you don’t pass the meta argument (more on this concept here). Here’s a small snippet:

import pandas as pd
from dask import delayed
import dask.dataframe as dd
from dask.distributed import Client

client = Client()

data = pd.DataFrame({'x': range(5), 'y': range(5)})

def func(x):
    return x

delayed_data = delayed(func)(data)
# triggers a compute if 'meta' is not specified
ddf = dd.from_delayed(
    delayed_data,
    meta=data.head(1)
)
ddf.to_parquet('foo', engine='pyarrow')

I hope that helps! Additionally, I would encourage you to share your notebook in the showcase topic if you would like, it’s a nice illustration of using delayed!

Hi @scharlottej13 and @pavithraes

Thanks a lot for taking the time to reply and your kind guide.

I followed your suggestion and updated my script by providing a meta to from_delayed call (to prevent dask from guessing the output’s structure), it seems that it did the trick :slight_smile: thanks a lot.

2 Likes