Dask.compute() not fetching all the results

Hello everyone,

I recently came across dask.delayed to parallelize my code. I have six functions which can execute independently. The response from each function is a pandas dataframe, and once all the six dataframes are returned, I simply concatenate them to further transform it to get my final desired output. To parallelize my code, I annotated each function with @dask.delayed. Another delayed function takes all six dataframes as input and concats them before sending the final output.

When I execute the compute statement on the final function, the expected behavior would be to retrieve a combined dataframe. Once the compute is complete, then the rest of the lines of code would resume to execute and transform this combined dataframe. However, many a times, one or the other of the six dataframes (random) is not present in the final df which is causing my subsequent flow to break.

If I am understanding it right, would it be due to the fact that final_df is getting created before all the six dataframes were retrieved? And if so, how can I ensure that the final function only executes once all six functions have run successfully and have returned respective dataframes?

What you describe should work in theory, there must be something wrong in your code. Please post an example.

However, the way you’re approaching the problem is suboptimal - after you concatenate the six dataframes, you will have to pay for their full size in RAM and you won’t be able to parallelize anything.
You should use dask.dataframe.from_delayed to generate a dask.dataframe instead and enjoy full parallelism afterwards.