I’m running into an odd serialization error when I attempt to convert a Dask Dataframe into a Dask Array. The Dask client object is connected to a remote cluster. This error does not occur when I try to run this on a local cluster.
2022-05-06 05:34:26,403 - distributed.protocol.pickle - INFO - Failed to serialize (subgraph_callable-2cc3f985-f7cb-4a79-8c96-d4d2257e9b5e, 'from-delayed-ec8e7b03b6f74af03a3c2307fbdebc40', (<function apply at 0x7fd6cdbf5550>, <function _read_sql_chunk at 0x7fd5e96289d0>, [<sqlalchemy.sql.selectable.Select object at 0x7fd6f7d13b50>, 'remote_cluster_address:8123/default', Empty DataFrame
Columns: [feature_a, feature_b, feature_c, label]
Index: []], (<class 'dict'>, [['engine_kwargs', (<class 'dict'>, [])], ['index_col', 'index']]))). Exception: cannot pickle 'sqlalchemy.cprocessors.UnicodeResultProcessor' object
2022-05-06 05:34:26,404 - distributed.protocol.core - CRITICAL - Failed to Serialize
Here is my code to generate this error…
dask_client = dask.distributed.Client(
dask_scheduler_address,
serializers=['dask'],
deserializers=['dask']
)
sa_metadata = sqlalchemy.MetaData(bind=engine)
sa_table = sqlalchemy.Table(target_table, sa_metadata, autoload_with=engine)
query = sql.select([sql.column(name) for name in feature_names]).select_from(sa_table)
data_df = dask.dataframe.read_sql_query(query, con=conn_str, index_col="index", npartitions=8)
data_df.fillna(method="ffill", axis=1)
data_array = data_df.to_dask_array(lengths=True) # error occurs on this line
I’m not entirely sure where to proceed from here… Any advice would be greatly appreciated.