Dask read sql - Index column requirements

Hello guys!
I am migrating most part of my codes from pandas to dask due to memory size limitations.
One point that I am struggling new is related to the “read sql” function.

I am trying to read a simple table :
df = dd.read_sql_table('SBB', f'ibm_db_sa://{usr}:{pwd}@XXX.xx.com.br:50100/XXXXX;', index_col='transaction_date')

When reading with pandas, I do not need a index col. I understand that Dask requires one so I coult read the data across parallel queries based on such index (I thought!).

1) Why can´t I use a simple orderable column instead of a indexed column (in my example, I could split the table based on the column date despite of the date columns is not a indexed table)?

2) If I try to read my table anyway, I am getting an error related do lack of access to some system tables. My question is why does dask try to read such information about my table? I see that is somenthing related to autoreflect from SQLA, but why does it need it? Why pandas does not need? :

--------------------------------------------------------------------------- Exception Traceback (most recent call last) File c:\Users\X\AppData\Local\Programs\Python\Python311\Lib\site-packages\ibm_db_dbi.py:1312, in Cursor._execute_helper(self, parameters) [1311](file:///C:/UsersX/AppData/Local/Programs/Python/Python311/Lib/site-packages/ibm_db_dbi.py:1311) try: -> [1312](file:///C:/Users/X/AppData/Local/Programs/Python/Python311/Lib/site-packages/ibm_db_dbi.py:1312) return_value = ibm_db.execute(self.stmt_handler, parameters) [1313](file:///C:/Users/X/AppData/Local/Programs/Python/Python311/Lib/site-packages/ibm_db_dbi.py:1313) if not return_value: Exception: Statement Execute Failed: [IBM][CLI Driver][DB2] SQL0551N The statement failed because the authorization ID does not have the required authorization or privilege to perform the operation. Authorization ID: "XXXX". Operation: "SELECT". Object: "SYSIBM.SYSTABCONST". SQLSTATE=42501 SQLCODE=-551 During handling of the above exception, another exception occurred: ProgrammingError Traceback (most recent call last) File 

...

[SQL: SELECT "SYSIBM"."SYSKEYCOLUSE"."CONSTNAME", "SYSIBM"."SYSKEYCOLUSE"."COLNAME" FROM "SYSIBM"."SYSKEYCOLUSE", "SYSIBM"."SYSTABCONST" WHERE "SYSIBM"."SYSKEYCOLUSE"."CONSTNAME" = "SYSIBM"."SYSTABCONST"."CONSTNAME" AND "SYSIBM"."SYSTABCONST"."TBNAME" = ? AND "SYSIBM"."SYSTABCONST"."TBCREATOR" = ? AND "SYSIBM"."SYSTABCONST"."TYPE" = ? ORDER BY "SYSIBM"."SYSKEYCOLUSE"."CONSTNAME"] [parameters: ('SBB', 'schema', 'U')] (Background on this error at: https://sqlalche.me/e/20/f405)

You are not required to have your column indexed in the DB engine, but it is recommended for performance. Dask has no way to determine whether it is indexed or not.

why does dask try to read such information

dask wants to know the data types of your query, so that it can make a lazy representation of the table and work out the appropriate bounds for the partitions. If you pass divisions= and meta=, it may be that the lookup is not required

    divisions: sequence
        Values of the index column to split the table by. If given, this will
        override ``npartitions`` and ``bytes_per_chunk``. The divisions are the value
        boundaries of the index column used to define the partitions. For
        example, ``divisions=list('acegikmoqsuwz')`` could be used to partition
        a string column lexographically into 12 partitions, with the implicit
        assumption that each partition contains similar numbers of records.
1 Like

Thanks for the reply but I still dont get it.

If I run this codes I get results as expected

datainicio = date(2024,1,1)
datafim =  date.today() 
produto = 'Afiliados'
sql = f''' select * from XXXX.XXXXX
        WHERE DATE(DATA_TRANSACAO) >= '{datainicio}' 
            AND DATE(DATA_TRANSACAO) <= '{datafim}'
            AND PRODUTO = '{produto}'
        '''
df = pd.read_sql(sql=sql, con=f'ibm_db_sa://{usr}:{pwd}@XXX.XX.com.br:50100/XXXX', chunksize=500_000,  index_col='data_transacao')

But if I just move from pandas to Dask, on the same query!

df = dd.read_sql(sql=sql, con=f'ibm_db_sa://{usr}:{pwd}@XXXX.XX.com.br:50100/XXXX', chunksize=500_000,  index_col='data_transacao')

Exception: Statement Execute Failed: [IBM][CLI Driver][DB2] SQL0302N The value of a host variable in the EXECUTE or OPEN statement is out of range for its corresponding use. SQLSTATE=22001 SQLCODE=-302 
...

[SQL: SELECT "SYSIBM"."SYSCOLUMNS"."NAME", "SYSIBM"."SYSCOLUMNS"."TYPENAME", "SYSIBM"."SYSCOLUMNS"."DEFAULT", "SYSIBM"."SYSCOLUMNS"."NULLS", "SYSIBM"."SYSCOLUMNS"."LENGTH", "SYSIBM"."SYSCOLUMNS"."SCALE", "SYSIBM"."SYSCOLUMNS"."GENERATED_ATTR", "SYSIBM"."SYSCOLUMNS"."REMARKS" FROM "SYSIBM"."SYSCOLUMNS" WHERE "SYSIBM"."SYSCOLUMNS"."TBCREATOR" = ? AND "SYSIBM"."SYSCOLUMNS"."TBNAME" = ? ORDER BY "SYSIBM"."SYSCOLUMNS"."COLNO"] [parameters: ('XXXX', " select * from XXXX.XXXXX\n WHERE DATE(DATA_TRANSACAO) >= '2024-01-01' \n AND DATE(DATA_TRANSACAO) <= '2024-07-16'\n AND PRODUTO = 'Afiliados'\n ")]

Tried another approach

dtypes = {
    'status': 'category',
    'produto': 'category',
    'parceiro': 'category',
    'data_transacao': 'datetime64[ns]',
    'mci': 'Int32',
    'marca': 'category',
    'sku': 'string',
    'cod_transacao': 'string',
    'forma_pagamento': 'category',
    'gmv': 'Float32',
    'receita': 'Float32',
    'cashback': 'Float32'
}

df = dd.from_pandas(pd.DataFrame(columns=dtypes.keys()).astype(dtypes), npartitions=1)

sql = f'''
    SELECT * FROM XXXXX.XXXX limit 1000
'''
ddf = dd.read_sql(sql=sql, con=f'ibm_db_sa://{usr}:{pwd}@XXXX.XX.com.br:50100/XXXXX', chunksize=500_000, index_col='data_transacao', meta =df, npartitions = 10)


Statement Execute Failed: [IBM][CLI Driver][DB2] SQL0551N The statement failed because the authorization ID does not have the required authorization or privilege to perform the operation. Authorization ID: "XXXX". Operation: "SELECT". Object: "SYSIBM.SYSTABCONST". SQLSTATE=42501 SQLCODE=-551

You did not pass divisions (which would conflict with chunksize and npartitions, which conflict with each other anyway), and meta should be a pandas frame, not dask.

Thanks for the reply, but still getting the same issue. As far as I´ve read, my suggested division should be a monthly period of 2024. The index column is a datetime column containing data from 2021 up to 2024.
I still don´t understand why dask tries do query some main/admin database tables (which I do not have access)

import pandas as pd
import dask.dataframe as dd
from sqlalchemy import create_engine, text

dtypes = {
‘status’: ‘category’,
‘produto’: ‘category’,
‘parceiro’: ‘category’,
‘data_transacao’: ‘datetime64[ns]’,
‘mci’: ‘Int32’,
‘marca’: ‘category’,
‘sku’: ‘string’,
‘cod_transacao’: ‘string’,
‘forma_pagamento’: ‘category’,
‘gmv’: ‘Float32’,
‘receita’: ‘Float32’,
‘cashback’: ‘Float32’
}

df = pd.DataFrame(columns=dtypes.keys()).astype(dtypes)

sql = f’‘’
SELECT * FROM XXXX.XXXXX limit 1000
‘’’
divisions = pd.date_range(start=‘2024-01-01’, end=‘2024-12-31’, freq=‘ME’).tolist()
divisions = [d.to_pydatetime() for d in divisions]

ddf = dd.read_sql(sql=sql, con=f’ibm_db_sa://{usr}:{pwd}@XX.XX.com.br:50100/XXXX’, index_col=‘data_transacao’, meta =df, divisions = divisions)

same issue.

--------------------------------------------------------------------------- Exception Traceback (most recent call last) File c:\Users\F3164582\AppData\Local\Programs\Python\Python311\Lib\site-packages\ibm_db_dbi.py:1312, in Cursor._execute_helper(self, parameters) [1311](file:///C:/Users/F3164582/AppData/Local/Programs/Python/Python311/Lib/site-packages/ibm_db_dbi.py:1311) try: -> [1312](file:///C:/Users/F3164582/AppData/Local/Programs/Python/Python311/Lib/site-packages/ibm_db_dbi.py:1312) return_value = ibm_db.execute(self.stmt_handler, parameters) [1313](file:///C:/Users/F3164582/AppData/Local/Programs/Python/Python311/Lib/site-packages/ibm_db_dbi.py:1313) if not return_value: Exception: Statement Execute Failed: [IBM][CLI Driver][DB2] SQL0551N The statement failed because the authorization ID does not have the required authorization or privilege to perform the operation. Authorization ID: "XXXX". Operation: "SELECT". Object: "SYSIBM.SYSTABCONST". SQLSTATE=42501 SQLCODE=-551 During handling of the above exception, another exception occurred: ProgrammingError Traceback (most recent call last) File c:\Users\F3164582\AppData\Local\Programs\Python\Python311\Lib\site-packages\sqlalchemy\engine\base.py:1967, in Connection._exec_single_context(self, dialect, context, statement, parameters) [1966](file:///C:/Users/F3164582/AppData/Local/Programs/Python/Python311/Lib/site-packages/sqlalchemy/engine/base.py:1966) if not evt_handled: -> [1967](file:///C:/Users/F3164582/AppData/Local/Programs/Python/Python311/Lib/site-packages/sqlalchemy/engine/base.py:1967) self.dialect.do_execute( [1968](file:///C:/Users/F3164582/AppData/Local/Programs/Python/Python311/Lib/site-packages/sqlalchemy/engine/base.py:1968) cursor, str_statement, effective_parameters, context [1969](file:///C:/Users/F3164582/AppData/Local/Programs/Python/Python311/Lib/site-packages/sqlalchemy/engine/base.py:1969) ) [1971](file:///C:/Users/F3164582/AppData/Local/Programs/Python/Python311/Lib/site-packages/sqlalchemy/engine/base.py:1971) if self._has_events or self.engine._has_events: File c:\Users\F3164582\AppData\Local\Programs\Python\Python311\Lib\site-packages\ibm_db_sa\ibm_db.py:138, in DB2Dialect_ibm_db.do_execute(self, cursor, statement, parameters, context) [137](file:///C:/Users/F3164582/AppData/Local/Programs/Python/Python311/Lib/site-packages/ibm_db_sa/ibm_db.py:137) else: --> [138](file:///C:/Users/F3164582/AppData/Local/Programs/Python/Python311/Lib/site-packages/ibm_db_sa/ibm_db.py:138) cursor.execute(statement, parameters) File c:\Users\F3164582\AppData\Local\Programs\Python\Python311\Lib\site-packages\ibm_db_dbi.py:1410, in Cursor.execute(self, operation, parameters) [1409](file:///C:/Users/F3164582/AppData/Local/Programs/Python/Python311/Lib/site-packages/ibm_db_dbi.py:1409) self._set_cursor_helper()

...

[SQL: SELECT "SYSIBM"."SYSKEYCOLUSE"."CONSTNAME", "SYSIBM"."SYSKEYCOLUSE"."COLNAME" FROM "SYSIBM"."SYSKEYCOLUSE", "SYSIBM"."SYSTABCONST" WHERE "SYSIBM"."SYSKEYCOLUSE"."CONSTNAME" = "SYSIBM"."SYSTABCONST"."CONSTNAME" AND "SYSIBM"."SYSTABCONST"."TBNAME" = ? AND "SYSIBM"."SYSTABCONST"."TBCREATOR" = ? AND "SYSIBM"."SYSTABCONST"."TYPE" = ? ORDER BY "SYSIBM"."SYSKEYCOLUSE"."CONSTNAME"] [parameters: ('\n SELECT * FROM XXXX.XXXX limit 1000\n', 'DB2I023A', 'U')] (Background on this error at: https://sqlalche.me/e/20/f405)

For those whom face the same issue, I could solve it by creating the sql query straight in sqlalchemy.

metadata = MetaData()
shoppingbb = Table ('SHOPPINGBB', metadata,  
    Column('status', String(50),nullable=False),
    Column('produto', String(50),nullable=False),
    Column('parceiro', String(50),nullable=True),
    Column('cod_transacao', String(50),nullable=True),
    Column('marca', String(50),nullable=False),
    Column('forma_pagamento', String(50),nullable=True),
    Column('data_transacao', DateTime,nullable=False),
    Column('mci', Float,nullable=True),
    Column('gmv', Float,nullable=False),
    Column('receita', Float,nullable=True),
    Column('cashback',Float,nullable=True),
    extend_existing=True, schema='DB2I023A')

stmt = sa.select(shoppingbb)
ddf = dd.read_sql(sql=stmt, index_col='data_transacao', con=f'ibm_db_sa://{usr}:{pwd}@xxxx.xx.com.br:50100/xxxxx')
1 Like