Getting the error in executing the tasks in parallel

Hi,
Getting the following error while executing the task in parallel.
TypeError: cannot pickle ‘sqlalchemy.cprocessors.UnicodeResultProcessor’ object

We have 5 tasks in parallel as they are not dependent on each other. These 5 tasks are reading the data in parallel from the Data Base. Once the data is read it’s doing the computation in parallel before they are finally converge to one function which does the main computation. There also it fails due to one of the function (down in the DAG) taking more time to execute. We can share the DAG if required.

Thanks
Rohit Gosain

1 Like

Hi @GosainRohit and welcome! If you could share a minimal reproducer that would be extremely helpful! It’s hard to tell from the error message alone if we should start troubleshooting with Dask or SQLAlchemy.

1 Like

Hi,
Let me know what you need from my side, I will share it with you. I have shared the DAG with Gus over an email. Let me know how can I share this with you as I can’t see the option of attaching a file. Also here is the snipped of the code that we are calling.

-------Code-------

df_corp_cldr_days = read_dell_fiscal_calendar_for_days(engine,last_saturday)

df_final_forecast = dask.delayed(get_forecast_for_days)(engine,part,site,df_part_site,last_saturday,df_corp_cldr_days,cfg)
df_final_forecast = dask.delayed(add_missing_days)(df_final_forecast,‘Forecast’,‘fisc_wk_strt_dt’,df_part_site,df_corp_cldr_days)
df_final_forecast = dask.delayed(pivot_table_measure)(df_final_forecast,sort_col,cfg,week)

df_final_backlog = dask.delayed(get_backlog_for_days)(engine,part,site,df_part_site,current_date,last_saturday,df_corp_cldr_days,cfg)
df_final_backlog = dask.delayed(add_missing_days)(df_final_backlog,‘Backlog’,‘cldr_dt’,df_part_site,df_corp_cldr_days)
df_final_backlog = dask.delayed(pivot_table_measure)(df_final_backlog,sort_col,cfg,week)

df_future_backlog = dask.delayed(get_future_backlog_for_days)(engine,part,site,df_part_site,df_corp_cldr_days,cfg)
df_future_backlog = dask.delayed(add_missing_days)(df_future_backlog,‘Future Backlog’,‘cldr_dt’,df_part_site,df_corp_cldr_days)
df_future_backlog = dask.delayed(pivot_table_measure)(df_future_backlog,sort_col,cfg,week)

df_final_inventory = dask.delayed(get_inventory_for_days)(engine,part,site,df_part_site,current_date,last_saturday,df_corp_cldr_days,cfg)
df_final_inventory = dask.delayed(add_missing_days_inventory)(df_final_inventory,df_part_site,df_corp_cldr_days)
df_final_inventory = dask.delayed(pivot_table_measure)(df_final_inventory,sort_col,cfg,week)

df_net_supply = dask.delayed(get_net_supply_for_days)(engine,part,site,df_part_site,last_saturday,df_corp_cldr_days,cfg)
df_net_supply = dask.delayed(add_missing_days)(df_net_supply,‘Net Supply’,‘cldr_dt’,df_part_site,df_corp_cldr_days)
df_net_supply = dask.delayed(pivot_table_measure)(df_net_supply,sort_col,cfg,week)

df_target_dsi = dask.delayed(get_target_dsi)(engine,week,part,site,df_part_site,days,df_corp_cldr_days,last_saturday,cfg)
df_target_dsi = dask.delayed(add_missing_days)(df_target_dsi,‘Target DSI’,‘fisc_wk_strt_dt’,df_part_site,df_corp_cldr_days)
df_target_dsi = dask.delayed(pivot_table_measure)(df_target_dsi,sort_col,cfg,week)

print(‘Called all 5 functions’)

df_list = [df_final_forecast,df_final_backlog,df_future_backlog,df_net_supply,df_target_dsi]

df_list = [df_final_forecast,df_final_backlog,df_future_backlog,df_final_inventory,df_net_supply,df_target_dsi]

#img1 = dask.visualize(df_list)
#display(img1)
#df_list = dask.compute(*df_list)

df_gds = dask.delayed(merge_data)(df_list)
df_final_gds = dask.delayed(calculate_measures_group)(df_gds,week)
df_final_gds = dask.delayed(format_gds)(df_final_gds,compressed,cfg,week)

img = dask.visualize(df_final_gds)
display(img)

json = dask.compute(df_final_gds)

json = df_final_gds.groupby([‘Part’,‘Site’,‘Supplier’]).apply(lambda x: x.iloc[0:,3:].to_dict(‘records’)).reset_index().rename(columns={0:‘Measures’}).to_dict(orient=‘records’)

print(‘Get Total GDS Ended’)

Hi @GosainRohit the code above is not a piece of code that we can reproduce and or run locally to inspect what’s happening. At a first sight what’s happening is somewhere in your code (a traceback might be useful to identify where) Dask is trying to pickle this sqlalchemy.cprocessors.UnicodeResultProcessor which is not pickable. This is something probably on your data? Do you know where that object comes from? It’s hard to tell without a traceback, and/or a minimal reproducible example that we can run.

Can you let me know how to traceback to the point where the error is coming from. I won’t be able to share the code/data with you for reproducing the same. However, we can get into a call and then do a walkthrough of what we have done. Let me know if the call works, will send out the invite.

Thanks,
Rohit

@GosainRohit You can post the traceback of the error (the message that you get when it fails) in between backticks so it will format it as code. If you read the lines above the TypeError it should have more information on where is failing. If you can post more than the TypeError we might be able to help, otherwise, it’ll be difficult to assist you.

Regarding the call, this space is meant to interact with the community and we all contribute to it in our personal time, having a call is not something we encourage as it defeats the purpose of the discourse forum.

2 Likes

Hi,
Are you looking for the following:


AttributeError Traceback (most recent call last)
c:\users\rohit_gosain\appdata\local\programs\python\python39\lib\site-packages\distributed\protocol\pickle.py in dumps(x, buffer_callback, protocol)
48 buffers.clear()
—> 49 result = pickle.dumps(x, **dump_kwargs)
50 if len(result) < 1000:

AttributeError: Can’t pickle local object ‘create_engine..connect’

During handling of the above exception, another exception occurred:

TypeError Traceback (most recent call last)
in
1 b = time.time()
----> 2 json = await get_total_gds(engine,‘HFX4T’,‘EMFP’,91,last_saturday,current_date,False,True,‘’)
3 e = time.time()
4 print('Timeeee : ', e-b)
5 print(json)

in get_total_gds(engine, part, site, days, last_saturday, current_date, week, compressed, cfg)
84 display(img)
85
—> 86 json = dask.compute(df_final_gds)
87 ##print(json)
88

c:\users\rohit_gosain\appdata\local\programs\python\python39\lib\site-packages\dask\base.py in compute(traverse, optimize_graph, scheduler, get, *args, **kwargs)
569 postcomputes.append(x.dask_postcompute())
570
→ 571 results = schedule(dsk, keys, **kwargs)
572 return repack([f(r, *a) for r, (f, a) in zip(results, postcomputes)])
573

c:\users\rohit_gosain\appdata\local\programs\python\python39\lib\site-packages\distributed\client.py in get(self, dsk, keys, workers, allow_other_workers, resources, sync, asynchronous, direct, retries, priority, fifo_timeout, actors, **kwargs)
2669 Client.compute : Compute asynchronous collections
2670 “”"
→ 2671 futures = self._graph_to_futures(
2672 dsk,
2673 keys=set(flatten([keys])),

c:\users\rohit_gosain\appdata\local\programs\python\python39\lib\site-packages\distributed\client.py in _graph_to_futures(self, dsk, keys, workers, allow_other_workers, priority, user_priority, resources, retries, fifo_timeout, actors)
2594 # Pack the high level graph before sending it to the scheduler
2595 keyset = set(keys)
→ 2596 dsk = dsk.dask_distributed_pack(self, keyset, annotations)
2597
2598 # Create futures before sending graph (helps avoid contention)

c:\users\rohit_gosain\appdata\local\programs\python\python39\lib\site-packages\dask\highlevelgraph.py in dask_distributed_pack(self, client, client_keys, annotations)
1074 “module”: layer.module,
1075 “name”: type(layer).name,
→ 1076 “state”: layer.dask_distributed_pack(
1077 self.get_all_external_keys(),
1078 self.key_dependencies,

c:\users\rohit_gosain\appdata\local\programs\python\python39\lib\site-packages\dask\highlevelgraph.py in dask_distributed_pack(self, all_hlg_keys, known_key_dependencies, client, client_keys)
432 for k, v in dsk.items()
433 }
→ 434 dsk = toolz.valmap(dumps_task, dsk)
435 return {“dsk”: dsk, “dependencies”: dependencies}
436

c:\users\rohit_gosain\appdata\local\programs\python\python39\lib\site-packages\toolz\dicttoolz.py in valmap(func, d, factory)
81 “”"
82 rv = factory()
—> 83 rv.update(zip(d.keys(), map(func, d.values())))
84 return rv
85

c:\users\rohit_gosain\appdata\local\programs\python\python39\lib\site-packages\distributed\worker.py in dumps_task(task)
4353 return d
4354 elif not any(map(_maybe_complex, task[1:])):
→ 4355 return {“function”: dumps_function(task[0]), “args”: warn_dumps(task[1:])}
4356 return to_serialize(task)
4357

c:\users\rohit_gosain\appdata\local\programs\python\python39\lib\site-packages\distributed\worker.py in warn_dumps(obj, dumps, limit)
4362 def warn_dumps(obj, dumps=pickle.dumps, limit=1e6):
4363 “”“Dump an object to bytes, warn if those bytes are large”“”
→ 4364 b = dumps(obj, protocol=4)
4365 if not _warn_dumps_warned[0] and len(b) > limit:
4366 _warn_dumps_warned[0] = True

c:\users\rohit_gosain\appdata\local\programs\python\python39\lib\site-packages\distributed\protocol\pickle.py in dumps(x, buffer_callback, protocol)
58 try:
59 buffers.clear()
—> 60 result = cloudpickle.dumps(x, **dump_kwargs)
61 except Exception as e:
62 logger.info(“Failed to serialize %s. Exception: %s”, x, e)

c:\users\rohit_gosain\appdata\local\programs\python\python39\lib\site-packages\cloudpickle\cloudpickle_fast.py in dumps(obj, protocol, buffer_callback)
71 file, protocol=protocol, buffer_callback=buffer_callback
72 )
—> 73 cp.dump(obj)
74 return file.getvalue()
75

c:\users\rohit_gosain\appdata\local\programs\python\python39\lib\site-packages\cloudpickle\cloudpickle_fast.py in dump(self, obj)
600 def dump(self, obj):
601 try:
→ 602 return Pickler.dump(self, obj)
603 except RuntimeError as e:
604 if “recursion” in e.args[0]:

TypeError: cannot pickle ‘sqlalchemy.cprocessors.UnicodeResultProcessor’ object

json

@GosainRohit Thank you for the traceback!

AttributeError: Can’t pickle local object ‘create_engine..connect’ suggests your code might be trying to pickle the connection to your database.

I think something outside the code snippet you’re shared is causing this. In your code, what does engine refer to? Perhaps it’s related to how you’re reading the files with Delayed (i.e., the read_dell_fiscal_calendar_for_days function)?

We really would need a reproducible example to diagnose this further.

The following links might be helpful:

Also, sorry for the delay in response!

2 Likes