What changed in the latest release with the default to use dask-expr?

(a) it seems like if you import dask dataframe, you now need dask-expr in your environment.
(b) I’m now getting errors and would love to see the release notes to figure out what changed – but I don’t see where there are release notes… :confused:

Basically if I set this - dask.config.set({‘dataframe.query-planning’: False}) - everything works. So there’s a behavior change in how Dask DataFrames are constructed when dask-expr is used. Where can I learn more about that?

As you mentioned, the 2024.3.0 release added dask-expr as a dependency. Installing with with pip or conda

pip install "dask[dataframe]"

or

conda install dask

the appropriate dependencies should get pulled in. Are you installing in a different way?

but I don’t see where there are release notes

Thanks for pointing that out. It looks like our docs build failed, so I’ve retriggered it and the changelog entry for 2024.3.0 is available here now Changelog — Dask documentation.

I’m now getting errors and would love to see the release notes to figure out what changed

Sorry to hear that. Please open an issue over in the Dask issue tracker Issues · dask/dask · GitHub and we’ll help figure out what the underlying issue is.

As mentioned in the changelog, the new query planning functionality can be disabled with

import dask
dask.config.set({'dataframe.query-planning': False})

which may be useful as a temporary workaround.

3 Likes

yep I have that as a work around to turn it off - thanks. I also created Docs on how to transition to using dask-expr -- given that it's now the default for dataframes in the new dask release · Issue #968 · dask/dask-expr · GitHub to ask for docs on behavior changes.

2 Likes

I’m experiencing issues perhaps related to the same issue. With dask-expr installed, the issue happens with release 2024.3.0 and later. I am using dask.dataframe.read_parquet to read from a directory on S3 that contains multiple parquet files. I am running on Python 3.10.14 [GCC 10.2.1 20210110] on a linux machine (and see this happen with Python 3.11 as well). If I include dask.config.set({"dataframe.query-planning": False}) in my code, the issue goes away, but not a good long term solution as I presume query-planning will eventually be forced.

I am running the following:

import dask.dataframe as dd
ddf = dd.read_parquet(
    path=s3_input_path,
    filters=filter_expression,
    filesystem=pyarrow_s3_file_system,
)
ddf.compute()

Which throws the following on the ddf.compute() call:

File /usr/local/lib/python3.11/site-packages/dask_expr/_collection.py:474, in FrameBase.compute(self, fuse, **kwargs)
    472 if not isinstance(out, Scalar):
    473     out = out.repartition(npartitions=1)
--> 474 out = out.optimize(fuse=fuse)
    475 return DaskMethodsMixin.compute(out, **kwargs)

File /usr/local/lib/python3.11/site-packages/dask_expr/_collection.py:589, in FrameBase.optimize(self, fuse)
    571 def optimize(self, fuse: bool = True):
    572     """Optimizes the DataFrame.
    573
    574     Runs the optimizer with all steps over the DataFrame and wraps the result in a
   (...)
    587         The optimized Dask Dataframe
    588     """
--> 589     return new_collection(self.expr.optimize(fuse=fuse))

File /usr/local/lib/python3.11/site-packages/dask_expr/_expr.py:94, in Expr.optimize(self, **kwargs)
     93 def optimize(self, **kwargs):
---> 94     return optimize(self, **kwargs)

File /usr/local/lib/python3.11/site-packages/dask_expr/_expr.py:3009, in optimize(expr, fuse)
   2988 """High level query optimization
   2989
   2990 This leverages three optimization passes:
   (...)
   3005 optimize_blockwise_fusion
   3006 """
   3007 stage: core.OptimizerStage = "fused" if fuse else "simplified-physical"
-> 3009 return optimize_until(expr, stage)

File /usr/local/lib/python3.11/site-packages/dask_expr/_expr.py:2965, in optimize_until(expr, stage)
   2962     return expr
   2964 # Manipulate Expression to make it more efficient
-> 2965 expr = expr.rewrite(kind="tune")
   2966 if stage == "tuned-logical":
   2967     return expr

File /usr/local/lib/python3.11/site-packages/dask_expr/_core.py:263, in Expr.rewrite(self, kind)
    261 # Allow children to rewrite their parents
    262 for child in expr.dependencies():
--> 263     out = getattr(child, up_name)(expr)
    264     if out is None:
    265         out = expr

File /usr/local/lib/python3.11/site-packages/dask_expr/io/parquet.py:965, in ReadParquetPyarrowFS._tune_up(self, parent)
    964 def _tune_up(self, parent):
--> 965     if self._fusion_compression_factor >= 1:
    966         return
    967     if isinstance(parent, FusedParquetIO):

File /usr/local/lib/python3.11/site-packages/dask_expr/io/parquet.py:1006, in ReadParquetPyarrowFS._fusion_compression_factor(self)
   1004 @property
   1005 def _fusion_compression_factor(self):
-> 1006     approx_stats = self.approx_statistics()
   1007     total_uncompressed = 0
   1008     after_projection = 0

File /usr/local/lib/python3.11/site-packages/dask_expr/io/parquet.py:782, in ReadParquetPyarrowFS.approx_statistics(self)
    780 files_to_consider = np.array(self._dataset_info["all_files"])[idxs]
    781 stats = [_STATS_CACHE[tokenize(finfo)] for finfo in files_to_consider]
--> 782 return _combine_stats(stats)

File /usr/local/lib/python3.11/site-packages/dask_expr/io/parquet.py:1859, in _combine_stats(stats)
   1851 """Combine multiple file-level statistics into a single dict of metrics that
   1852 represent the average values of the parquet statistics"""
   1853 agg_cols = {
   1854     "total_compressed_size": statistics.mean,
   1855     "total_uncompressed_size": statistics.mean,
   1856     "path_in_schema": lambda x: set(x).pop(),
   1857 }
   1858 return _agg_dicts(
-> 1859     _aggregate_statistics_to_file(stats),
   1860     {
   1861         "num_rows": statistics.mean,
   1862         "num_row_groups": statistics.mean,
   1863         "serialized_size": statistics.mean,
   1864         "total_byte_size": statistics.mean,
   1865         "columns": partial(_aggregate_columns, agg_cols=agg_cols),
   1866     },
   1867 )

File /usr/local/lib/python3.11/site-packages/dask_expr/io/parquet.py:1822, in _aggregate_statistics_to_file(stats)
   1819     file_stat = file_stat.copy()
   1820     aggregated_stats.append(file_stat)
-> 1822     file_stat.update(_agg_dicts(file_stat.pop("row_groups"), agg_func))
   1823 return aggregated_stats

File /usr/local/lib/python3.11/site-packages/dask_expr/io/parquet.py:1779, in _agg_dicts(dicts, agg_funcs)
   1777     agg = agg_funcs.get(k)
   1778     if agg:
-> 1779         result2[k] = agg(v)
   1780 return result2

File /usr/local/lib/python3.11/site-packages/dask_expr/io/parquet.py:1796, in _aggregate_columns(cols, agg_cols)
   1794         break
   1795     i += 1
-> 1796 return [_agg_dicts(c, agg_cols) for c in combine]

File /usr/local/lib/python3.11/site-packages/dask_expr/io/parquet.py:1796, in <listcomp>(.0)
   1794         break
   1795     i += 1
-> 1796 return [_agg_dicts(c, agg_cols) for c in combine]

File /usr/local/lib/python3.11/site-packages/dask_expr/io/parquet.py:1779, in _agg_dicts(dicts, agg_funcs)
   1777     agg = agg_funcs.get(k)
   1778     if agg:
-> 1779         result2[k] = agg(v)
   1780 return result2

File /usr/local/lib/python3.11/site-packages/dask_expr/io/parquet.py:1779, in _agg_dicts(dicts, agg_funcs)
   1777     agg = agg_funcs.get(k)
   1778     if agg:
-> 1779         result2[k] = agg(v)
   1780 return result2

TypeError: '<' not supported between instances of 'NoneType' and 'NoneType'

It seems that perhaps the issue might be with file statistics somehow?. I tried a few options of calling dd.read_parquet differently but no luck.

Thanks in advance for any help

Hi @efranco6363, welcome to Dask community!

I would recommend, as @jrbourbeau mentionned, to open an issue on Dask issue tracker.

For the time being please disable query-planning for your use case.

Thanks @guillaumeeb ! I added to this existing issue but can open a new one if you think that’s best