I am getting KeyError while using the read_parquet
method. First, I save a Dask Dataframe by calling the to_parquet
method, and then, I try to read the Parquet Dataset by calling the read_parquet
method. This error occurs only when I use a column as a filter in the read_parquet
method that I used to construct the directory-based partitioning in theto_parquet
method. Below, I show part of the code
from datetime import date, datetime, time, timezone
from typing import Any, ClassVar, List, Tuple, Type, Optional, Literal, Union
import dask.dataframe as dd
import pandas as pd
import pyarrow
import sqlalchemy.types as sqlatypes
from pyarrow import Schema
from pyarrow.dataset import Partitioning, partitioning
from sqlalchemy import (
Boolean,
Column,
Date,
DateTime,
Float,
Integer,
MetaData,
String,
Table,
Time,
)
LoadFilterOperator = Literal["=", "==", "!=", ">", "<", ">=", "<=", "in", "not in"]
LoadFilterTerm = Tuple[str, LoadFilterOperator, Union[Any, List[Any]]]
LoadFilter = Union[List[LoadFilterTerm], List[List[LoadFilterTerm]]]
class Dataset:
name: ClassVar[str]
table: ClassVar[Table]
index_column: ClassVar[str]
partitions: ClassVar[List[str]]
class _dataset_test(Dataset):
name = "Test_dataset"
table = Table(
name,
MetaData,
Column("id", Integer, primary_key=True),
Column("integer", Integer),
Column("integer_null", Integer),
Column("string", String),
Column("string_null", String),
Column("float", Float),
Column("float_null", Float),
Column("boolean", Boolean),
Column("boolean_null", Boolean),
Column("date", Date),
Column("date_null", Date),
Column("date_null", Date),
Column("time", Time),
Column("time_null", Time),
Column("datetime", DateTime),
Column("datetime_null", DateTime),
)
index_column = "id"
partitions = ["integer", "string", "float"]
_df_test = (
pd.DataFrame(
{
"id": [1, 2, 3],
"integer": [1, 2, 3],
"integer_null": [1, 2, None],
"string": ["a", "b", "c"],
"string_null": ["a", "b", None],
"float": [1.0, 2.0, 3.8],
"float_null": [1.0, 2.4, None],
"boolean": [True, False, True],
"boolean_null": [True, False, None],
"date": [date(2020, 1, 1), date(2020, 1, 2), date(2020, 1, 3)],
"date_null": [date(2020, 1, 1), date(2020, 1, 2), None],
"time": [time(1, 0), time(2, 0), time(3, 0)],
"time_null": [time(1, 0), time(2, 0), None],
"datetime": [
datetime(2020, 1, 1, 0, 0),
datetime(2020, 1, 2, 0, 0),
datetime(2020, 1, 3, 0, 0),
]
"datetime_null": [
datetime(2020, 1, 1, 0, 0),
datetime(2020, 1, 2, 0, 0),
None,
]
}
)
.convert_dtypes()
.set_index("id")
)
_df_test["datetime"] = _df_test["datetime"].dt.tz_localize("UTC")
_df_test["datetime_null"] = _df_test["datetime_null"].dt.tz_localize("UTC")
def extract_pyarrow_schema(
table: Table, partitions: List[str] | None = None
) -> Tuple[Schema, Partitioning | None]:
pyarrow_types [
pyarrow.field(column.name, from_sqlalchemy_to_pyarrow(column.type))
for column in table.columns
]
pyarrow_schema = pyarrow.schema(pyarrow_types)
if partitions:
partition_types = [
pyarrow.field(field.name, field.type)
for field in pyarrow_types
if field.name in partitions
]
partition_schema = partitioning(pyarrow.schema(partition_types), flavor="hive")
else:
partition_schema = None
return pyarrow_schema, partition_schema
def from_sqlalchemy_to_pyarrow(sqltype: sqlatypes.TypeEngine) -> pyarrow.DataType:
if isinstance(sqltype, sqlatypes.Integer):
return pyarrow.int64()
if isinstance(sqltype, sqlatypes.String):
return pyarrow.string()
if isinstance(sqltype, sqlatypes.Float):
return pyarrow.float64()
if isinstance(sqltype, sqlatypes.Boolean):
return pyarrow.bool_()
if isinstance(sqltype, sqlatypes.DateTime):
return pyarrow.timestamp("ns", tz=timezone.utc)
if isinstance(sqltype, sqlatypes.Date):
return pyarrow.date32()
if isinstance(sqltype, sqlatypes.Time):
return pyarrow.time64("ns")
raise NotImplementedError(f"Unsupported type: {sqltype}")
def save(df: pd.DataFrame, dataset: Type[Dataset]) -> None:
dataframe = dd.from_pandas(df, npartitions=3)
dataframe = dataframe.reset_index()
dataframe = dataframe.persist()
kwargs = dict(
engine="pyarrow",
write_index=False,
append=True,
write_metadat_file=False,
)
if dataset.partitions:
kwargs["partition_on"] = dataset.partitions
if dataset.table is not None:
schema, _ = extract_pyarrow_schema(dataset.table)
kwargs["schema"] = schema
try:
dataframe.to_parquet("path/to/output", **kwargs)
except FileNotFoundError:
kwargs["append"] = False
dataframe.to_parquet("path/to/output", **kwargs)
def read(
dataset: Type[Dataset],
filters: Optional[LoadFilter] = None,
columns: Optional[List[str]]
) -> dd.DataFrame:
kwargs = dict(
"engine": "pyarrow",
)
if filters:
kwargs["filters"] = filters
if columns:
kwargs["columns"] = columns
if dataset.table is not None:
_, partitioning = extract_pyarrow_schema(dataset.table, dataset.partitions)
if partitioning:
kwargs["dataset"] = {
"partitioning": {"flavor": "hive", "schema": partitioning.schema}
}
try:
df = dd.read_parquet("path/to/output", **kwargs)
except (FileNotFoundError, ValueError) as e:
print("The error is: ",e)
return df
The save
and read
functions are the most important part.
When I run the following code:
save(_df_test, _dataset_test)
I get the following directory structure:
path/to/output/
├── integer=1/
│ ├── string=a/
│ │ ├── float=1.0/
│ │ │ ├── part.0.parquet
├── integer=2/
│ ├── string=b/
│ │ ├── float=2.0/
│ │ │ ├── part.1.parquet
├── integer=1/
│ ├── string=c/
│ │ ├── float=3.8/
│ │ │ ├── part.3.parquet
But, when I try to read the Parquet Dataset, I get the KeyError:
df = read(_dataset_test, [("integer", ">", 2)], ["id"])
df = df.compute()
I should receive the following dataframe:
pd.DataFrame({"id": [3]}, dtype="Int64").set_index("id")
The error occurs in the _get_rg_statistics(row_group, col_name)
method. This method is in dask/dataframe/io/parquet/arrow.py
file.
Note that the integer
field was used to filter the data and to construct the directory-based partitioning. I am using the 2023.4.1
version of the dask library.