`read_parquet` filters not working with query optimizer

Hi,

First of all thank you for dask. :pray:

I’m currently migrating a code base from dask 2023.5.x to 2024.7.x and I might have hit a bug with the optimizer. I’m not sure enough though so I didn’t feel confident opening a github issue.

I have a dask workflow which essentialy does this:

dask.dataframe.read_parquet(
  ..., 
  filters= [ .. some filters ...]
).map_partitions(
  ...
).to_parquet()

When running this operation with the optimizer, it first tries to repartition the read_parquet to a single partition which in itself could make sense.

However in the process of re-partitioning ,I suspect it doesn’t apply the filters argument to the read_parquet call, loading all the data (which is too big to fit in RAM) and thus crashes the workflow.

My current workaround, is to use the to_legacy_dataframe method this way:

dask.dataframe.read_parquet(
  ..., 
  filters= [ .. some filters ...]
).to_legacy_dataframe().map_partitions(...).to_parquet()

I haven’t been able to really identify the root cause though so I thought I would share it here.

Thank you for your help.

Hi @Timost, welcome to Dask community!

Could you give more details about the filter you are using?

What makes you say that?

Dask query optimizer should apply obvious filters in itself, but maybe there is something more complicated in your case?

Have you taken a look at Feedback - DataFrame query planning · Issue #10995 · dask/dask · GitHub?

Hi Guillaume,

Thank you for your answer, and sorry for the missing details '^^.

Some more context:

The data that is stored in the parquet files I’m loading is essentially (source_id, reference, value, other_columns), source_id is a UInt64 index. It comes from a Dask Dataframe that is previously partitioned and sorted (using set_index) on source_id before being dumped to parquet.

The filter I’m using is of the following form:

result_ddf = dask.dataframe.read_parquet(
        f"s3://my/object_storage/path",
        storage_options=constants.OBJECT_STORAGE_OPTS,
        columns=["reference", "value"],
        filters=[
            ("source_id", "<=", other_df.index.max()),
            ("source_id", ">=", other_df.index.min()),
        ],
    )

Where other_df is a pandas dataframe and other_df.index.max() is also a UInt64 value.
Of note is the fact I’m filtering on the index so maybe that would explain the issue ?

Sorry, I came across the doc stating that the optimizer will automatically repartition the dataframe which mislead me.

And now I realize that my first answer is not accurately reproducing my worklow :bowing_woman:.

What It’s actually doing is:

df = dask.dataframe.read_parquet(
  ..., 
  filters= [ .. some filters ...]
).map_partitions(
  ...
).compute()  # <<<< important edit here

Looking at:
https://docs.dask.org/en/latest/generated/dask_expr._collection.DataFrame.compute.html#dask_expr._collection.DataFrame.compute

It says that, for compute, the optimizer injects a repartition to 1, and I think it’s that repartition that misses the filters on the read_parquet call.

I have, should I report this problem on the issue ?

Well, it should do it after…

Probably, your use case looks simple, I’m not sure of what might be the problem.

Alright,

I opened an issue here

Thanks again for all your answers. :+1:

1 Like