Hi,
I’m trying to query hive-partitioned data as fast as I can, ideally without actually accessing the files. I’m not even sure this is possible with Dask, but would appreciate any guidance.
Suppose I have a set of files:
s3://my_bucket/path/to/dataset/col1=0/col2=0/part.01.parquet
s3://my_bucket/path/to/dataset/col1=0/col2=0/part.02.parquet
s3://my_bucket/path/to/dataset/col1=1/col2=1/part.01.parquet
...
I’m trying to grab the min/max of each column, but without actually reading the Parquet files.
If I do:
import dask.dataframe as dd
input_data = "s3://my_bucket/path/to/dataset/"
dd_df = dd.read_parquet(input_data, dataset={"partitioning": {"flavor": "filename"}}))
dd_df["col1"].max().compute() # slow
then my dataset is not hive-partitioned, and for me to grab min/max of col1 or col2, I need to access each individual filename. This operator is understandably slow.
Now, I would imagine that if I do:
import dask.dataframe as dd
input_data = "s3://my_bucket/path/to/dataset/"
dd_df = dd.read_parquet(input_data, dataset={"partitioning": {"flavor": "hive"}}))
dd_df["col1"].max().compute() # slow, should be fast?
then when computing max I can infer col1 or col2 directly from the hive partitions, not needing to open every single file. This is because for every row in each partition, they have the same col1/col2 value. But apparently that is not true, Dask still accesses every single file.
I also tried it with Polars
import polars as pl
import pyarrow as pa
import pyarrow.dataset as ds
schema = pa.schema([("col1", pa.int64()), ("col2", pa.int64())])
dset = ds.dataset(input_data, format="parquet", partitioning="hive", schema=schema)
df_pl = pl.scan_pyarrow_dataset(dset)
df_pl_hive.select("col1").max().collect(streaming=True) # very slow
and got actually slower results than Dask.