Serialization error when converting Dask Dataframe to Dask Array

I’m running into an odd serialization error when I attempt to convert a Dask Dataframe into a Dask Array. The Dask client object is connected to a remote cluster. This error does not occur when I try to run this on a local cluster.

2022-05-06 05:34:26,403 - distributed.protocol.pickle - INFO - Failed to serialize (subgraph_callable-2cc3f985-f7cb-4a79-8c96-d4d2257e9b5e, 'from-delayed-ec8e7b03b6f74af03a3c2307fbdebc40', (<function apply at 0x7fd6cdbf5550>, <function _read_sql_chunk at 0x7fd5e96289d0>, [<sqlalchemy.sql.selectable.Select object at 0x7fd6f7d13b50>, 'remote_cluster_address:8123/default', Empty DataFrame
Columns: [feature_a, feature_b, feature_c, label]
Index: []], (<class 'dict'>, [['engine_kwargs', (<class 'dict'>, [])], ['index_col', 'index']]))). Exception: cannot pickle 'sqlalchemy.cprocessors.UnicodeResultProcessor' object
2022-05-06 05:34:26,404 - distributed.protocol.core - CRITICAL - Failed to Serialize

Here is my code to generate this error…

dask_client = dask.distributed.Client(
sa_metadata = sqlalchemy.MetaData(bind=engine)
sa_table = sqlalchemy.Table(target_table, sa_metadata, autoload_with=engine)

query =[sql.column(name) for name in feature_names]).select_from(sa_table)

data_df = dask.dataframe.read_sql_query(query, con=conn_str, index_col="index", npartitions=8)
data_df.fillna(method="ffill", axis=1)
data_array = data_df.to_dask_array(lengths=True)  # error occurs on this line

I’m not entirely sure where to proceed from here… Any advice would be greatly appreciated.

For read_sql_qeury, you should not pass a connection object, but a string for generating one. Each worker must make its own connection.

con : str
    Full sqlalchemy URI for the database connection

The connection object is not being passed to the read_sql_query() call. I was passing the URI string.
Seemed to me like the serialization problem has to do with the sqlalchemy select object for the query, not a sqlalchemy connection object.

I managed to find a workaround by splitting up the query, using pandas.dataframe.read_sql(), and building up the dask dataframe partition by partition using dask.dataframe.from_delayed().