Hi,
First, thank you for creating (and supporting) Dask, it is amazing.
I have a brief Python code publicly available here (dask-duckdb-dbeaver/out_of_memory_ETL.ipynb at main · akbaritabar/dask-duckdb-dbeaver · GitHub). It includes 2 custom functions that use Dask delayed.
When I run the code, looking at the Dask dashboard, and in the task stream to be specific, it seems like the tasks are being done 2 times. Once they all go through and then they pass the output to my call to exporting into parquet files. I was puzzled what could be the reason as my code is pretty simple and I don’t have replicated parts in the script.
Here is an example photo:
I hope the above description is clear enough so that you can share your opinion (on the root cause of it) with me. Thanks.
Hi @akbaritabar and welcome to discourse! Thanks for this question and for the easily reproducible code.
@pavithraes and I think the duplication you’re seeing is from the from_delayed
call, which will trigger a compute
if you don’t pass the meta
argument (more on this concept here). Here’s a small snippet:
import pandas as pd
from dask import delayed
import dask.dataframe as dd
from dask.distributed import Client
client = Client()
data = pd.DataFrame({'x': range(5), 'y': range(5)})
def func(x):
return x
delayed_data = delayed(func)(data)
# triggers a compute if 'meta' is not specified
ddf = dd.from_delayed(
delayed_data,
meta=data.head(1)
)
ddf.to_parquet('foo', engine='pyarrow')
I hope that helps! Additionally, I would encourage you to share your notebook in the showcase topic if you would like, it’s a nice illustration of using delayed!
Hi @scharlottej13 and @pavithraes
Thanks a lot for taking the time to reply and your kind guide.
I followed your suggestion and updated my script by providing a meta to from_delayed call (to prevent dask from guessing the output’s structure), it seems that it did the trick thanks a lot.
2 Likes