KeyError while using the read_parquet method

I am getting KeyError while using the read_parquet method. First, I save a Dask Dataframe by calling the to_parquet method, and then, I try to read the Parquet Dataset by calling the read_parquet method. This error occurs only when I use a column as a filter in the read_parquet method that I used to construct the directory-based partitioning in theto_parquet method. Below, I show part of the code

from datetime import date, datetime, time, timezone
from typing import Any, ClassVar, List, Tuple, Type, Optional, Literal, Union

import dask.dataframe as dd
import pandas as pd
import pyarrow
import sqlalchemy.types as sqlatypes
from pyarrow import Schema
from pyarrow.dataset import Partitioning, partitioning
from sqlalchemy import (
    Boolean,
    Column,
    Date,
    DateTime,
    Float,
    Integer,
    MetaData,
    String,
    Table,
    Time,
)

LoadFilterOperator = Literal["=", "==", "!=", ">", "<", ">=", "<=", "in", "not in"]
LoadFilterTerm = Tuple[str, LoadFilterOperator, Union[Any, List[Any]]]
LoadFilter = Union[List[LoadFilterTerm], List[List[LoadFilterTerm]]]

class Dataset:
    name: ClassVar[str]
    table: ClassVar[Table]
    index_column: ClassVar[str]
    partitions: ClassVar[List[str]]

class _dataset_test(Dataset):
    name = "Test_dataset"
    table = Table(
          name,
          MetaData,
          Column("id", Integer, primary_key=True),
          Column("integer", Integer),
          Column("integer_null", Integer),
          Column("string", String),
          Column("string_null", String),
          Column("float", Float),
          Column("float_null", Float),
          Column("boolean", Boolean),
          Column("boolean_null", Boolean),
          Column("date", Date),
          Column("date_null", Date),
          Column("date_null", Date),
          Column("time",  Time),
          Column("time_null", Time),
          Column("datetime", DateTime),
          Column("datetime_null", DateTime),
    )
    index_column = "id"
    partitions = ["integer", "string", "float"]

_df_test = (
       pd.DataFrame(
               {
                      "id": [1, 2, 3],
                      "integer": [1, 2, 3],
                      "integer_null": [1, 2, None],
                      "string": ["a", "b", "c"],
                      "string_null": ["a", "b", None],
                      "float": [1.0, 2.0, 3.8],
                      "float_null": [1.0, 2.4, None],
                      "boolean": [True, False, True],
                      "boolean_null": [True, False, None],
                      "date": [date(2020, 1, 1), date(2020, 1, 2), date(2020, 1, 3)],
                      "date_null": [date(2020, 1, 1), date(2020, 1, 2), None],
                      "time": [time(1, 0), time(2, 0), time(3, 0)],
                      "time_null": [time(1, 0), time(2, 0), None],
                      "datetime": [
                            datetime(2020, 1, 1, 0, 0),
                            datetime(2020, 1, 2, 0, 0),
                            datetime(2020, 1, 3, 0, 0),
                      ]
                      "datetime_null": [
                            datetime(2020, 1, 1, 0, 0),
                            datetime(2020, 1, 2, 0, 0),
                            None,
                       ]
               }
    )
    .convert_dtypes()
    .set_index("id")
)
_df_test["datetime"] = _df_test["datetime"].dt.tz_localize("UTC")
_df_test["datetime_null"] = _df_test["datetime_null"].dt.tz_localize("UTC")     

def extract_pyarrow_schema(
    table: Table, partitions: List[str] | None = None
) -> Tuple[Schema, Partitioning | None]:
    pyarrow_types [
           pyarrow.field(column.name, from_sqlalchemy_to_pyarrow(column.type))
           for column in table.columns
    ]
    pyarrow_schema = pyarrow.schema(pyarrow_types)    

    if partitions:
        partition_types = [
              pyarrow.field(field.name, field.type)
              for field in pyarrow_types
              if field.name in partitions
       ]
       partition_schema = partitioning(pyarrow.schema(partition_types), flavor="hive")
    else:
       partition_schema = None

    return pyarrow_schema, partition_schema       

def from_sqlalchemy_to_pyarrow(sqltype: sqlatypes.TypeEngine) -> pyarrow.DataType:
    if isinstance(sqltype, sqlatypes.Integer):
        return pyarrow.int64()
    if isinstance(sqltype, sqlatypes.String):
       return pyarrow.string()
    if isinstance(sqltype, sqlatypes.Float):
        return pyarrow.float64()
    if isinstance(sqltype, sqlatypes.Boolean):
        return pyarrow.bool_()
    if isinstance(sqltype, sqlatypes.DateTime):
        return pyarrow.timestamp("ns", tz=timezone.utc)
    if isinstance(sqltype, sqlatypes.Date):
        return pyarrow.date32()
    if isinstance(sqltype, sqlatypes.Time):
        return pyarrow.time64("ns")
    raise NotImplementedError(f"Unsupported type: {sqltype}")

def save(df: pd.DataFrame, dataset: Type[Dataset]) -> None:
    dataframe = dd.from_pandas(df, npartitions=3)
    dataframe = dataframe.reset_index()
    dataframe = dataframe.persist()

    kwargs = dict(
        engine="pyarrow",
        write_index=False,
        append=True,
        write_metadat_file=False,
    )

    if dataset.partitions:
        kwargs["partition_on"] = dataset.partitions
    if dataset.table is not None:
        schema, _ = extract_pyarrow_schema(dataset.table)
        kwargs["schema"] = schema

    try:
        dataframe.to_parquet("path/to/output", **kwargs)
    except FileNotFoundError:
       kwargs["append"] = False
       dataframe.to_parquet("path/to/output", **kwargs)

def read(
      dataset: Type[Dataset],
      filters: Optional[LoadFilter] = None,
      columns: Optional[List[str]]
) -> dd.DataFrame:
    kwargs = dict(
         "engine": "pyarrow",
    )
    if filters:
        kwargs["filters"] = filters
    if columns:
        kwargs["columns"] = columns
    if dataset.table is not None:
        _, partitioning = extract_pyarrow_schema(dataset.table, dataset.partitions)
       if partitioning:
           kwargs["dataset"] = {
                   "partitioning": {"flavor": "hive", "schema": partitioning.schema}
           }
    try:
        df = dd.read_parquet("path/to/output", **kwargs)
    except (FileNotFoundError, ValueError) as e:
        print("The error is: ",e)
    return df

The save and read functions are the most important part.

When I run the following code:

save(_df_test, _dataset_test)

I get the following directory structure:

path/to/output/
├── integer=1/
│   ├── string=a/
│   │   ├── float=1.0/
│   │   │   ├── part.0.parquet
├── integer=2/
│   ├── string=b/
│   │   ├── float=2.0/
│   │   │   ├── part.1.parquet
├── integer=1/
│   ├── string=c/
│   │   ├── float=3.8/
│   │   │   ├── part.3.parquet

But, when I try to read the Parquet Dataset, I get the KeyError:

df = read(_dataset_test, [("integer", ">", 2)], ["id"])
df = df.compute()

I should receive the following dataframe:

pd.DataFrame({"id": [3]}, dtype="Int64").set_index("id")

The error occurs in the _get_rg_statistics(row_group, col_name) method. This method is in dask/dataframe/io/parquet/arrow.py file.

Note that the integer field was used to filter the data and to construct the directory-based partitioning. I am using the 2023.4.1 version of the dask library.

Hi @lgfc, welcome to Dask community!

Thanks for the detailed post and the effort you put into it. Unfortunately, when copy pasting your code, I’m running onto syntax problem due to indentation. Moreover, it’s a bit long. Could you extract from it a minimal reproducer so that we could play with it?

It would be nice to have the entire stack trace too.

And finally, does it works as expected using Pandas only?

Hi @guillaumeeb!

I’d like to thank you for your help. I’ve changed some parts of the code in order to make it more readable.

Below, I show the new code

import dask.dataframe as dd
import pandas as pd
import pyarrow
from pyarrow import Schema
from pyarrow.dataset import partitioning

_df_test = (
    pd.DataFrame(
        {
            "id": [1, 2, 3],
            "integer": [1, 2, 3],
            "integer_null": [1, 2, None],
            "string": ["a", "b", "c"],
            "string_null": ["a", "b", None],
            "float": [1.0, 2.0, 3.8],
            "float_null": [1.0, 2.4, None],
        }
    )
    .convert_dtypes()
    .set_index("id")
)

def extract_pyarrow_schema(partitions=None):
    pyarrow_types = [
        pyarrow.field("id", pyarrow.int64()),
        pyarrow.field("integer", pyarrow.int64()),
        pyarrow.field("integer_null", pyarrow.int64()),
        pyarrow.field("string", pyarrow.string()),
        pyarrow.field("string_null", pyarrow.string()),
        pyarrow.field("float", pyarrow.float64()),
        pyarrow.field("float_null", pyarrow.float64()),
    ]
    pyarrow_schema = pyarrow.schema(pyarrow_types)

    if partitions:
        partition_types = [
            pyarrow.field(field.name, field.type)
            for field in pyarrow_types
            if field.name in partitions
        ]
        partition_schema = partitioning(pyarrow.schema(partition_types), flavor="hive")
    else:
        partition_schema = None
    
    return pyarrow_schema, partition_schema

def save(df):
    data = dd.from_pandas(df, npartitions=3)
    data = data.reset_index()
    data = data.persist()

    kwargs = dict(
        engine="pyarrow",
        write_index=False,
        append=True,
        write_metadata_file=False,
    )
    kwargs["partition_on"] = ["integer", "string", "float"]
    schema, _ = extract_pyarrow_schema()
    kwargs["schema"] = schema

    try:
        data.to_parquet("path/to/output/", **kwargs)
    except FileNotFoundError:
        kwargs["append"] = False
        data.to_parquet("path/to/output/", **kwargs)

def read(filters=None, columns=None):
    kwargs = dict(engine="pyarrow")
    if filters:
        kwargs["filters"] = filters
    if columns:
        kwargs["columns"] = columns
    
    _, partition_schema = extract_pyarrow_schema(["integer", "string", "float"])
    if partition_schema:
        kwargs["dataset"] = {
            "partitioning": {"flavor": "hive", "schema": partition_schema.schema}
        }
    
    try:
        df = dd.read_parquet("path/to/output", **kwargs)
    except (FileNotFoundError, ValueError) as e:
        print("The error is ", e)
    return df

save(_df_test)

df = read([("integer", ">", 2)], ["id"])
print(df.compute())

Below, I show the entire stack trace

The code works as expected using Pandas only.

Could you show me how you would do it with Pandas?

Hi, @guillaumeeb ! The codes were a little bit long. I’ve tried to simplify as most as possible. Below, I’ll show the code with Pandas and the code with Dask again.

The code with Dask:

import dask.dataframe as dd
import pandas as pd
import pyarrow
from pyarrow import Schema
from pyarrow.dataset import partitioning

_df_test = pd.DataFrame({"id": [1, 2, 3], "integer": [1, 2, 3], "string": ["a", "b", "c"], "float": [1.0, 2.0, 3.8],}).convert_dtypes()
partitions = ["integer", "string", "float"]
pyarrow_types = [pyarrow.field("id", pyarrow.int64()), pyarrow.field("integer", pyarrow.int64()), pyarrow.field("string", pyarrow.string()), pyarrow.field("float", pyarrow.float64()),]
partition_types = [pyarrow.field(field.name, field.type) for field in pyarrow_types if field.name in partitions]
pyarrow_schema = pyarrow.schema(pyarrow_types)
partition_schema = partitioning(pyarrow.schema(partition_types), flavor="hive")

def save(df):
    data = dd.from_pandas(df, npartitions=3)
    data.to_parquet("path/to/output/", engine="pyarrow", write_index=False, append=True, write_metadata_file=False, partition_on=partitions, schema=pyarrow_schema)

def read(filters, columns):
    df = dd.read_parquet("path/to/output", engine="pyarrow", filters=filters, columns=columns, dataset={"partitioning": {"flavor": "hive", "schema": partition_schema.schema}})
    return df

save(_df_test)
df = read([("integer", ">", 2)], ["id"])
print(df.compute())

Now, the code using Pandas.

import dask.dataframe as dd
import pandas as pd
import pyarrow
from pyarrow import Schema
from pyarrow.dataset import partitioning

_df_test = pd.DataFrame({"id": [1, 2, 3], "integer": [1, 2, 3], "string": ["a", "b", "c"], "float": [1.0, 2.0, 3.8],}).convert_dtypes()
partitions = ["integer", "string", "float"]
pyarrow_types = [pyarrow.field("id", pyarrow.int64()), pyarrow.field("integer", pyarrow.int64()), pyarrow.field("string", pyarrow.string()), pyarrow.field("float", pyarrow.float64()),]
partition_types = [pyarrow.field(field.name, field.type) for field in pyarrow_types if field.name in partitions]
pyarrow_schema = pyarrow.schema(pyarrow_types)
partition_schema = partitioning(pyarrow.schema(partition_types), flavor="hive")

def save(df):
    data = dd.from_pandas(df, npartitions=1)
    data.to_parquet("path/to/output/", engine="pyarrow", write_index=False, append=True, write_metadata_file=False, partition_on=partitions, schema=partition_schema)

def read(filters=None, columns=None):
    df = pd.read_parquet("path/to/output", engine="pyarrow", filters=filters, columns=columns, partitioning=partition_schema)
    return df

save(_df_test)
df = read([("integer", ">", 2)], ["id"])
print(df)

In the code with Pandas, I use Dask to save the dataset to ensure that to_parquet method works similarly for both Pandas and Dask Dataframes. The code with Pandas is very similar to the code with Dask. There are two important differences between the codes. These two differences are in the read method. First, the way I build the partitioning parameter is slightly different, and the second is which read_parquet method I call.

There seems to be a typo error in the save function on Pandas side: I just replaced schema=partition_schema by the code from the Dask code block.

Then, I’m not sure why you are not using the same set of parameters for the Dask read_parquet method as in Pandas? I just did that (so replacing dataset={"partitioning": {"flavor": "hive", "schema": partition_schema.schema}}) and it worked as with Pandas.

Did I miss something?

Yes, there is a typo error. The correct one would be schema=pyarrow_schema.

I’m not using the same set of parameters for Dask as in Pandas because doing that I’ve got another error. The Dask is converting the partitioned columns to categorical dtypes even when they are not categorical. For example, if I replace df = read([("integer", ">", 2)], ["id"]) to df = read([("float", ">", 1.0), ("float", ">", 3.0)], ["id"]), I get the following error:

The Dask documentation informs that partitioned columns will not be converted to categorical dtypes when a custom partitioning schema is specified in this way, i.e., dataset={"partitioning": {"flavor": "hive", "schema": ...}}

I’m having the exact same issue, it seems that Dask fails to filter by columns that are partitions when we pass dataset={"partitioning": {"flavor": "hive", "schema": pa.schema(...)}} ) to read_parquet.

Have you found any solution, @guillaumeeb?

Sorry, I have to admit I really don’t know what is the correct way to go here. cc @rjzamora @fjetter.

@lgfc @bressanmarcos - Do you still have problems with the latest version of dask? I ran the reproducer above and got the answer I’d expect:

   id
0   3

Handling of hive partitions has certainly changed a lot since the April release, but I’ll be happy to investigate/fix anything that is still broken.

You probably need 2023.7.1 or later to get the behavior you want (I’m assuming you need 10353 here).