JSON deserialisation does not work with dask read_sql_query, but with pandas'

Hello,

pandas read_sql_query successfully uses the engines json_derserializer to deserialize json to python objects. But dasks read_sql_query does not work.

We use the following version:

  • dask: 2024.6.2
  • SQLalchemy: 1.4.52
  • pandas: 2.1.4

We basically create an engine, which is bounded by the session and used for reading the sql query with pandas as well as for json deserialising some strings in the df off:

import jsonpickle
import jsonpickle.ext.numpy as jsonpickle_numpy

jsonpickle_numpy.register_handlers()
jsonpickle.set_preferred_backend("simplejson")
jsonpickle.set_encoder_options("simplejson", ignore_nan=True)

db_engine = sqlalchemy.create_engine(
    RuntimeConfig().store_uri,
    json_serializer=jsonpickle.dumps,
    json_deserializer=jsonpickle.loads,
)

db_session_maker = sqlalchemy.orm.sessionmaker()
db_session_maker.configure(bind=db_engine)

session = db_session_maker()

We then create a query as a SQLalchemy selectable object (return type: sqlalchemy.sql.expression.Select) which pandas can successfully execute and the json_deserializer, specified in the engine is used.

df = pd.read_sql_query(
            query, session.bind, index_col="INDEX_COL"
        )

On the other hand if we use the following it also successfully gets the df as a result, but it is not json deserialized.

df = dd.read_sql_query(
    query,
    session.bind.url.render_as_string(hide_password=False),
    index_col="INDEX_COL",
    engine_kwargs={"json_deserializer": jsonpickle.loads},
).compute()

As a workaround, I now manually apply jsonpickle.loads on the df.
But the question remains, why it does not work with dask, although it looks like the engine is created the same in the dask src code.

I also noticed that the native python deserialisation of sqlite3 TIMESTAMPS into datetime timestamp objects (with and without a json_deserializer) does not work with the dd.read_sql_query function.

Hi @tlogemann, welcome to Dask Discourse!

In your main code, you also register jsonpickle handlers, is it the case with Dask? Do you use Distributed or multi processing scheduler?

cc @crusaderky

Hello @guillaumeeb !

yes, the jsonpickle handlers should also be registered with dask. At least, I thought this would be done by providing the engine_kwargs to the read_sql_query call: engine_kwargs={"json_deserializer": jsonpickle.loads},

We simply use the dask dataframe, so the dd in the third code block is from

import dask.dataframe as dd

I thought, should be sufficient for registering the jsonpickling for deserializing, isn’t it?

Regarding distributed or multiprocessing scheduler, I am not sure.
The code I provided is basically all about the dask setup, so because we don’t use something from from dask.distributed, it uses the multiprocessing scheduler by default?

Regards

If you don’t use LocalCluster or Client or anything else, then the default Scheduler with Dataframe is a local threaded Scheduler. I was asking because this could have had an impact if using other processes, but this doesn’t look to be the case.

For the reste, I really don’t know why this is not taken into account, or how to do it.
cc @martindurant @crusaderky

Sorry, no good idea on this one. I agree that if you have workers in different processes, you probably need to run those initilisation lines for setting up JSON on every worker before running the task itself.

I traced it down (at least half the way) and it turns out, that sqlalchemy treats the followings queries differently.

Our (main) query looks like this:

    query = (
        sa.select(
            MuscleAction.id.label("muscle_action_id"),
            MuscleAction.walltime.label("muscle_action_walltime"),
            MuscleAction.simtimes.label("muscle_action_simtimes"),
            MuscleAction.sensor_readings.label("muscle_sensor_readings"),
            MuscleAction.actuator_setpoints.label("muscle_actuator_setpoints"),
            MuscleAction.rewards.label("muscle_action_rewards"),
            MuscleAction.objective.label("muscle_action_objective"),
            MuscleAction.done.label("muscle_action_done"),
            Agent.id.label("agent_id"),
            Agent.uid.label("agent_uid"),
            Agent.name.label("agent_name"),
            ExperimentRunPhase.id.label("experiment_run_phase_id"),
            ExperimentRunPhase.uid.label("experiment_run_phase_uid"),
            ExperimentRunPhase.configuration.label(
                "experiment_run_phase_configuration"
            ),
            ExperimentRunInstance.uid.label("experiment_run_instance_uid"),
            ExperimentRun.id.label("experiment_run_id"),
            ExperimentRun.uid.label("experiment_run_uid"),
            Experiment.id.label("experiment_id"),
            Experiment.name.label("experiment_name"),
        )
        .select_from(MuscleAction)
        .join(Agent)
        .join(ExperimentRunPhase)
        .join(ExperimentRunInstance)
        .join(ExperimentRun)
        .join(Experiment)
        .where(MuscleAction.actuator_setpoints != sa.JSON.NULL)
    )

If I directly use that with the deserialisation option for dask from my original post, then the deserialisation works half-way, but is still broken (more from that later).

But there is the following problem, why i cannot use that query directly: We want to support sqlite3 and postgres. With sqlite this works, but the limiting of dask is faulty or at least not compatible with that query for postgres.

Here is a postgres-mogrified (parameter bound) query:

SELECT muscle_actions.id AS muscle_action_id,
       muscle_actions.walltime AS muscle_action_walltime,
       muscle_actions.simtimes AS muscle_action_simtimes,
       muscle_actions.sensor_readings AS muscle_sensor_readings,
       muscle_actions.actuator_setpoints AS muscle_actuator_setpoints,
       muscle_actions.rewards AS muscle_action_rewards,
       muscle_actions.objective AS muscle_action_objective,
       muscle_actions.done AS muscle_action_done,
       agents.id AS agent_id,
       agents.uid AS agent_uid,
       agents.name AS agent_name,
       experiment_run_phases.id AS experiment_run_phase_id,
       experiment_run_phases.uid AS experiment_run_phase_uid,
       experiment_run_phases.configuration AS experiment_run_phase_configuration,
       experiment_run_instances.uid AS experiment_run_instance_uid,
       experiment_runs.id AS experiment_run_id,
       experiment_runs.uid AS experiment_run_uid,
       experiments.id AS experiment_id,
       experiments.name AS experiment_name
FROM muscle_actions
JOIN agents ON agents.id = muscle_actions.agent_id
JOIN experiment_run_phases ON experiment_run_phases.id = agents.experiment_run_phase_id
JOIN experiment_run_instances ON experiment_run_instances.id = experiment_run_phases.experiment_run_instance_id
JOIN experiment_runs ON experiment_runs.id = experiment_run_instances.experiment_run_id
JOIN experiments ON experiments.id = experiment_runs.experiment_id
WHERE muscle_actions.actuator_setpoints != -3627583194670394003
  AND experiments.id IN ('99')
  AND experiment_runs.uid IN ('Palaestrai pendulum env run sac kan')
  AND experiment_run_instances.uid IN ('74cffac3-b43f-4403-8e22-bbcaeb8f74c7')
  AND experiment_run_phases.uid IN ('Training Phase')
  AND agents.uid IN ('gym_agent')

Executing that query (as the sqlalchemy query over the dask read_sql_query function of course)
produces the following error with postgres:

>       cursor.execute(statement, parameters)
E       sqlalchemy.exc.ProgrammingError: (psycopg2.errors.UndefinedColumn) column "muscle_action_id" does not exist
E       LINE 3: ...ining Phase') AND agents.uid IN ('gym_agent') AND muscle_act...
E                                                                    ^
E       
E       [SQL: SELECT muscle_actions.id AS muscle_action_id, muscle_actions.walltime AS muscle_action_walltime, muscle_actions.simtimes AS muscle_action_simtimes, muscle_actions.sensor_readings AS muscle_sensor_readings, muscle_actions.actuator_setpoints AS muscle_actuator_setpoints, muscle_actions.rewards AS muscle_action_rewards, muscle_actions.objective AS muscle_action_objective, muscle_actions.done AS muscle_action_done, agents.id AS agent_id, agents.uid AS agent_uid, agents.name AS agent_name, experiment_run_phases.id AS experiment_run_phase_id, experiment_run_phases.uid AS experiment_run_phase_uid, experiment_run_phases.configuration AS experiment_run_phase_configuration, experiment_run_instances.uid AS experiment_run_instance_uid, experiment_runs.id AS experiment_run_id, experiment_runs.uid AS experiment_run_uid, experiments.id AS experiment_id, experiments.name AS experiment_name 
E       FROM muscle_actions JOIN agents ON agents.id = muscle_actions.agent_id JOIN experiment_run_phases ON experiment_run_phases.id = agents.experiment_run_phase_id JOIN experiment_run_instances ON experiment_run_instances.id = experiment_run_phases.experiment_run_instance_id JOIN experiment_runs ON experiment_runs.id = experiment_run_instances.experiment_run_id JOIN experiments ON experiments.id = experiment_runs.experiment_id 
E       WHERE muscle_actions.actuator_setpoints != %(actuator_setpoints_1)s AND experiments.id IN (%(id_1_1)s) AND experiment_runs.uid IN (%(uid_1_1)s) AND experiment_run_instances.uid IN (%(uid_2_1)s) AND experiment_run_phases.uid IN (%(uid_3_1)s) AND agents.uid IN (%(uid_4_1)s) AND muscle_action_id >= %(muscle_action_id_1)s AND muscle_action_id <= %(muscle_action_id_2)s]
E       [parameters: {'actuator_setpoints_1': 'null', 'muscle_action_id_1': 23632902, 'muscle_action_id_2': 23640618, 'id_1_1': '99', 'uid_1_1': 'Palaestrai pendulum env run sac kan', 'uid_2_1': '74cffac3-b43f-4403-8e22-bbcaeb8f74c7', 'uid_3_1': 'Training Phase', 'uid_4_1': 'gym_agent'}]
E       (Background on this error at: https://sqlalche.me/e/14/f405)

This raises, because postgres, other than sqlite, does not allow referencing labels in the where-clause of the same-level select-statement.

On the other hand, if I discard the label and use it with the index_col='muscle_actions.id' as follows:

SELECT muscle_actions.id,
       muscle_actions.walltime AS muscle_action_walltime,
       muscle_actions.simtimes AS muscle_action_simtimes,
       muscle_actions.sensor_readings AS muscle_sensor_readings,
       muscle_actions.actuator_setpoints AS muscle_actuator_setpoints,
       muscle_actions.rewards AS muscle_action_rewards,
       muscle_actions.objective AS muscle_action_objective,
       muscle_actions.done AS muscle_action_done,
       agents.id AS agent_id,
       agents.uid AS agent_uid,
       agents.name AS agent_name,
       experiment_run_phases.id AS experiment_run_phase_id,
       experiment_run_phases.uid AS experiment_run_phase_uid,
       experiment_run_phases.configuration AS experiment_run_phase_configuration,
       experiment_run_instances.uid AS experiment_run_instance_uid,
       experiment_runs.id AS experiment_run_id,
       experiment_runs.uid AS experiment_run_uid,
       experiments.id AS experiment_id,
       experiments.name AS experiment_name
FROM muscle_actions
JOIN agents ON agents.id = muscle_actions.agent_id
JOIN experiment_run_phases ON experiment_run_phases.id = agents.experiment_run_phase_id
JOIN experiment_run_instances ON experiment_run_instances.id = experiment_run_phases.experiment_run_instance_id
JOIN experiment_runs ON experiment_runs.id = experiment_run_instances.experiment_run_id
JOIN experiments ON experiments.id = experiment_runs.experiment_id
WHERE muscle_actions.actuator_setpoints != -152419438380283586
  AND experiments.id IN ('99')
  AND experiment_runs.uid IN ('Palaestrai pendulum env run sac kan')
  AND experiment_run_instances.uid IN ('74cffac3-b43f-4403-8e22-bbcaeb8f74c7')
  AND experiment_run_phases.uid IN ('Training Phase')
  AND agents.uid IN ('gym_agent')

Then it raises the following error:

>           raise KeyError(f"None of {missing} are in the columns")
E           KeyError: "None of ['muscle_actions.id'] are in the columns"

This is because internally dask suppresses the table name ‘muscle_actions’ and treat it only as ‘id’. But when only using ‘id’ as index_col, then it’s ambiguous:

>       cursor.execute(statement, parameters)
E       sqlalchemy.exc.ProgrammingError: (psycopg2.errors.AmbiguousColumn) column reference "id" is ambiguous
E       LINE 3: ...ining Phase') AND agents.uid IN ('gym_agent') AND id >= 2363...

So I don’t see, how to solve it with that query. But I can use a wrapper Select-*, that allows referencing the label in the outer select, which makes postgres happy. More concrete, I use

query = sa.select("*").select_from(query)

before the actual dd.read_sql_query call.

This works in the sense, that the data gets successfully queried and returned as a DataFrame, but it is not properly deserialized. (Here comes the problem teased above)

The py/object with the py/state-data is somehow parsed, but does not properly instantiate the class objects, but is rather represented as a string. It seems that it uses the result of the __repr__-function of the classes directly as a string instead of instantiating the class.

On first sight, it does not seem to an upstream fault, because using pandas it properly deserializes the objects also with the select-*:

    query = sa.select("*").select_from(query)
    res = pd.read_sql_query(
        query, session.bind, index_col="muscle_action_id"
    )

Do you think you would be able to build a reproducible example with sqlite and open an issue on Dask github tracker?