Getting the error in executing the tasks in parallel

Hi,
Let me know what you need from my side, I will share it with you. I have shared the DAG with Gus over an email. Let me know how can I share this with you as I can’t see the option of attaching a file. Also here is the snipped of the code that we are calling.

-------Code-------

df_corp_cldr_days = read_dell_fiscal_calendar_for_days(engine,last_saturday)

df_final_forecast = dask.delayed(get_forecast_for_days)(engine,part,site,df_part_site,last_saturday,df_corp_cldr_days,cfg)
df_final_forecast = dask.delayed(add_missing_days)(df_final_forecast,‘Forecast’,‘fisc_wk_strt_dt’,df_part_site,df_corp_cldr_days)
df_final_forecast = dask.delayed(pivot_table_measure)(df_final_forecast,sort_col,cfg,week)

df_final_backlog = dask.delayed(get_backlog_for_days)(engine,part,site,df_part_site,current_date,last_saturday,df_corp_cldr_days,cfg)
df_final_backlog = dask.delayed(add_missing_days)(df_final_backlog,‘Backlog’,‘cldr_dt’,df_part_site,df_corp_cldr_days)
df_final_backlog = dask.delayed(pivot_table_measure)(df_final_backlog,sort_col,cfg,week)

df_future_backlog = dask.delayed(get_future_backlog_for_days)(engine,part,site,df_part_site,df_corp_cldr_days,cfg)
df_future_backlog = dask.delayed(add_missing_days)(df_future_backlog,‘Future Backlog’,‘cldr_dt’,df_part_site,df_corp_cldr_days)
df_future_backlog = dask.delayed(pivot_table_measure)(df_future_backlog,sort_col,cfg,week)

df_final_inventory = dask.delayed(get_inventory_for_days)(engine,part,site,df_part_site,current_date,last_saturday,df_corp_cldr_days,cfg)
df_final_inventory = dask.delayed(add_missing_days_inventory)(df_final_inventory,df_part_site,df_corp_cldr_days)
df_final_inventory = dask.delayed(pivot_table_measure)(df_final_inventory,sort_col,cfg,week)

df_net_supply = dask.delayed(get_net_supply_for_days)(engine,part,site,df_part_site,last_saturday,df_corp_cldr_days,cfg)
df_net_supply = dask.delayed(add_missing_days)(df_net_supply,‘Net Supply’,‘cldr_dt’,df_part_site,df_corp_cldr_days)
df_net_supply = dask.delayed(pivot_table_measure)(df_net_supply,sort_col,cfg,week)

df_target_dsi = dask.delayed(get_target_dsi)(engine,week,part,site,df_part_site,days,df_corp_cldr_days,last_saturday,cfg)
df_target_dsi = dask.delayed(add_missing_days)(df_target_dsi,‘Target DSI’,‘fisc_wk_strt_dt’,df_part_site,df_corp_cldr_days)
df_target_dsi = dask.delayed(pivot_table_measure)(df_target_dsi,sort_col,cfg,week)

print(‘Called all 5 functions’)

df_list = [df_final_forecast,df_final_backlog,df_future_backlog,df_net_supply,df_target_dsi]

df_list = [df_final_forecast,df_final_backlog,df_future_backlog,df_final_inventory,df_net_supply,df_target_dsi]

#img1 = dask.visualize(df_list)
#display(img1)
#df_list = dask.compute(*df_list)

df_gds = dask.delayed(merge_data)(df_list)
df_final_gds = dask.delayed(calculate_measures_group)(df_gds,week)
df_final_gds = dask.delayed(format_gds)(df_final_gds,compressed,cfg,week)

img = dask.visualize(df_final_gds)
display(img)

json = dask.compute(df_final_gds)

json = df_final_gds.groupby([‘Part’,‘Site’,‘Supplier’]).apply(lambda x: x.iloc[0:,3:].to_dict(‘records’)).reset_index().rename(columns={0:‘Measures’}).to_dict(orient=‘records’)

print(‘Get Total GDS Ended’)