Speeding up hive partitioned queries

Hi,

I’m trying to query hive-partitioned data as fast as I can, ideally without actually accessing the files. I’m not even sure this is possible with Dask, but would appreciate any guidance.

Suppose I have a set of files:

s3://my_bucket/path/to/dataset/col1=0/col2=0/part.01.parquet
s3://my_bucket/path/to/dataset/col1=0/col2=0/part.02.parquet
s3://my_bucket/path/to/dataset/col1=1/col2=1/part.01.parquet
...

I’m trying to grab the min/max of each column, but without actually reading the Parquet files.

If I do:

import dask.dataframe as dd

input_data = "s3://my_bucket/path/to/dataset/"
dd_df = dd.read_parquet(input_data, dataset={"partitioning": {"flavor": "filename"}}))
dd_df["col1"].max().compute()  # slow

then my dataset is not hive-partitioned, and for me to grab min/max of col1 or col2, I need to access each individual filename. This operator is understandably slow.

Now, I would imagine that if I do:

import dask.dataframe as dd

input_data = "s3://my_bucket/path/to/dataset/"
dd_df = dd.read_parquet(input_data, dataset={"partitioning": {"flavor": "hive"}}))
dd_df["col1"].max().compute()  # slow, should be fast?

then when computing max I can infer col1 or col2 directly from the hive partitions, not needing to open every single file. This is because for every row in each partition, they have the same col1/col2 value. But apparently that is not true, Dask still accesses every single file.

I also tried it with Polars

import polars as pl
import pyarrow as pa
import pyarrow.dataset as ds

schema = pa.schema([("col1", pa.int64()), ("col2", pa.int64())])
dset = ds.dataset(input_data, format="parquet", partitioning="hive", schema=schema)
df_pl = pl.scan_pyarrow_dataset(dset)
df_pl_hive.select("col1").max().collect(streaming=True)  # very slow

and got actually slower results than Dask.

Hi @ccostadv,

I’m really not sure you can do that with Dask, I don’t think there is a side system that can read only the folder structure to get the maximum of a given partitionning column…

That said, you should be perfectly able to do it without Dask, but I understand this would make your workflow a bit akward.

cc @martindurant

Interestingly, dask does have just this metadata internally following dd.read_parquet but before the (slow) compute. For example, if you were to do ddf[ddf.col1==0], working on this section of the data should not need to read any of the data files not in the given directory. I’m not sure how to extract this information from the dataframe object.

Without dask, you can do:

s3 = fsspec.filesystem("s3")
fastparquet.ParquetFile("s3://my_bucket/path/to/dataset", fs=s3).cats

# or
frags = pyarrow.parquet.ParquetDataset("s3://my_bucket/path/to/dataset").fragments
# ... something with the .partition_expression

to get the metadata without reading the data.

Hi @martindurant and @guillaumeeb, thanks for your help.

I would like to understand a bit better how Dask does that computation, is there any location in the code either of you could point me towards?

Regarding the metadata, I was able to put something together using ParquetDataset like you suggested. Here is an example:

import pyarrow.dataset as ds
import pyarrow.parquet as pq

input_data = "s3://my_bucket/path/to/dataset/"
dset = pq.ParquetDataset(input_data, partitioning="hive")
max(ds.get_partition_keys(frag.partition_expression)["col1"] for frag in dset.fragments)

I would suppose methods called from here: dask/dask/dataframe/io/parquet/arrow.py at main · dask/dask · GitHub