String index divisions not working?

I’m dumping tables with a SQL query to Parquet files. When the id of the table is an int everything works as expected. When it’s a string the divisions don’t appear to exist when I load it back with read_parquet.

Let’s say I have two different tables corresponding to df1 and df2. df1 has an int index and df2 has a string index.

Working example:

df1 = dd.read_sql_query(df1_query, con=con_string, index_col='id', bytes_per_chunk="256 MiB")
dd.to_parquet(df1, 'data/output/df1', engine='pyarrow', compression='snappy')
...
df1 = dd.read_parquet("data/output/df1", engine='pyarrow', calculate_divisions=True)

assert df1.known_division # all good!

Failing example:

import string
string_divisions = list(string.ascii_lowercase)

df2 = dd.read_sql_query(df2_query, con=con_string, index_col='id', divisions=string_divisions)
dd.to_parquet(df2, 'data/output/df2', engine='pyarrow', compression='snappy')
...
df2 = dd.read_parquet("data/output/df2", engine='pyarrow', calculate_divisions=True)

assert df2.known_division # not good!

Any help appreciated

EDIT:
Could this be related to inconsistencies in how my PG instance sorts strings (collation “en_US.UTF-8”) and how Python sorts strings?

If I check the first character of the string ids according to the PG order I get this order

['0', '1', '2', '3', '4', '5', '7', '8', '9', 'a', '.', '-', 'b', 'c', '%', 'd', 'e', 'f', 'g', 'h', 'i', "'", 'j', 'k', 'l', 'm', 'n', 'o', 'p', 'q', 'r', 's', 't', 'u', 'v', 'w', 'x', 'y', 'z']

Trying to use that as my divisions with read_sql_query gives ValueError: divisions must be sorted

Just tried this simple code sample:

import pandas as pd
import dask.dataframe as dd

df = pd.DataFrame(list(range(26)),index=list('abcdefghijklmnopqrstuvwxyz'), columns=['string'])
ddf = dd.from_pandas(df, 4)
dd.to_parquet(ddf, '/tmp/ddf', engine='pyarrow', compression='snappy')
ddf2 = dd.read_parquet("/tmp/ddf", engine='pyarrow', calculate_divisions=True)
ddf2.known_divisions

which returns True. So it seems String index divisions are working in this simple case.

Came up with a reproduction, I think.

Sqlite seemingly doesn’t support collations so you unfortunately need a Postgres db with the en_US.UTF-8 locale installed to reproduce this. With DATABASE_URI pointing at the db the following seems to pinpoint the issue:

import pandas as pd
import dask.dataframe as dd
import random
import sqlalchemy

id_data = ['faculdade-fge', 'cultura-inglesa', '%D']
random_numbers = [random.randint(0, 100) for _ in range(len(id_data))]

df = pd.DataFrame(dict(id=id_data, val=random_numbers)).set_index('id').sample(frac=1)

engine = sqlalchemy.create_engine(DATABASE_URI)

with engine.connect() as connection:
    with connection.begin():
        connection.exec_driver_sql('''
            DROP TABLE IF EXISTS test_table;
            CREATE TABLE test_table (
                id TEXT PRIMARY KEY COLLATE "en_US.utf8",
                val INTEGER
            );
        ''')

        for index, row in df.iterrows():
            connection.exec_driver_sql(
                'INSERT INTO test_table (id, val) VALUES (%(id)s, %(value)s)',
                dict(id=index, value=int(row['val']))
            )

# Load the data using Dask
ddf = dd.read_sql_table("test_table", con=DATABASE_URI, index_col='id', divisions=list('abcdefghijklmnopqrstuvwxyz'))

assert ddf.known_divisions

dd.to_parquet(ddf, 'test.parquet', engine='pyarrow', compression='snappy')
test_df = dd.read_parquet('test.parquet', engine='pyarrow', calculate_divisions=True)

assert test_df.known_divisions # fails!

I notice two ways to make this pass:

  • Change the string '%D' to the string '%'
  • Use the default collation instead, i.e. change id TEXT PRIMARY KEY COLLATE "en_US.UTF-8", to id TEXT PRIMARY KEY,

Hope this helps

So the problem seems to be when reading from SQL with certain parameters. Could you check what are the divisions from the ddf you obtain when reading from SQL?

Is this workaround enough for you, or you cannot do this with your real data? It seems to be a problem with encoding when using SQL.

The divisions print as expected ('a', 'b', 'c', 'd', 'e', 'f', 'g', 'h', 'i', 'j', 'k', 'l', 'm', 'n', 'o', 'p', 'q', 'r', 's', 't', 'u', 'v', 'w', 'x', 'y', 'z')

I’m not familiar with Dask internals but from skimming the code it looks like the provided divisions are used to generate SQL statements. If the sort defined by the locale of the text column differs from Python’s sort then I assume the following happens:

  • Dask generates the SQL statements using the divisions as bounds, and assumes what it gets back is indeed ‘sorted’ according to its standards
  • Somewhere in the process of saving or reading the Parquet file some code is realizing that the partitions are not truly sorted according to Python sort order

Is this workaround enough for you, or you cannot do this with your real data?

No, what I mentioned is not a workaround for me. I’ve just given up and added an autoincrementing integer id with an index on these tables. But, if this is a bug with Dask I can help test anything else, if needed

Just wondering something, in your example you are specifying the divisions when calling read_sql, but are you confident that they cover all the string ids in your database?