I’m experiencing issues perhaps related to the same issue. With dask-expr
installed, the issue happens with release 2024.3.0
and later. I am using dask.dataframe.read_parquet
to read from a directory on S3 that contains multiple parquet files. I am running on Python 3.10.14 [GCC 10.2.1 20210110]
on a linux machine (and see this happen with Python 3.11 as well). If I include dask.config.set({"dataframe.query-planning": False})
in my code, the issue goes away, but not a good long term solution as I presume query-planning
will eventually be forced.
I am running the following:
import dask.dataframe as dd
ddf = dd.read_parquet(
path=s3_input_path,
filters=filter_expression,
filesystem=pyarrow_s3_file_system,
)
ddf.compute()
Which throws the following on the ddf.compute()
call:
File /usr/local/lib/python3.11/site-packages/dask_expr/_collection.py:474, in FrameBase.compute(self, fuse, **kwargs)
472 if not isinstance(out, Scalar):
473 out = out.repartition(npartitions=1)
--> 474 out = out.optimize(fuse=fuse)
475 return DaskMethodsMixin.compute(out, **kwargs)
File /usr/local/lib/python3.11/site-packages/dask_expr/_collection.py:589, in FrameBase.optimize(self, fuse)
571 def optimize(self, fuse: bool = True):
572 """Optimizes the DataFrame.
573
574 Runs the optimizer with all steps over the DataFrame and wraps the result in a
(...)
587 The optimized Dask Dataframe
588 """
--> 589 return new_collection(self.expr.optimize(fuse=fuse))
File /usr/local/lib/python3.11/site-packages/dask_expr/_expr.py:94, in Expr.optimize(self, **kwargs)
93 def optimize(self, **kwargs):
---> 94 return optimize(self, **kwargs)
File /usr/local/lib/python3.11/site-packages/dask_expr/_expr.py:3009, in optimize(expr, fuse)
2988 """High level query optimization
2989
2990 This leverages three optimization passes:
(...)
3005 optimize_blockwise_fusion
3006 """
3007 stage: core.OptimizerStage = "fused" if fuse else "simplified-physical"
-> 3009 return optimize_until(expr, stage)
File /usr/local/lib/python3.11/site-packages/dask_expr/_expr.py:2965, in optimize_until(expr, stage)
2962 return expr
2964 # Manipulate Expression to make it more efficient
-> 2965 expr = expr.rewrite(kind="tune")
2966 if stage == "tuned-logical":
2967 return expr
File /usr/local/lib/python3.11/site-packages/dask_expr/_core.py:263, in Expr.rewrite(self, kind)
261 # Allow children to rewrite their parents
262 for child in expr.dependencies():
--> 263 out = getattr(child, up_name)(expr)
264 if out is None:
265 out = expr
File /usr/local/lib/python3.11/site-packages/dask_expr/io/parquet.py:965, in ReadParquetPyarrowFS._tune_up(self, parent)
964 def _tune_up(self, parent):
--> 965 if self._fusion_compression_factor >= 1:
966 return
967 if isinstance(parent, FusedParquetIO):
File /usr/local/lib/python3.11/site-packages/dask_expr/io/parquet.py:1006, in ReadParquetPyarrowFS._fusion_compression_factor(self)
1004 @property
1005 def _fusion_compression_factor(self):
-> 1006 approx_stats = self.approx_statistics()
1007 total_uncompressed = 0
1008 after_projection = 0
File /usr/local/lib/python3.11/site-packages/dask_expr/io/parquet.py:782, in ReadParquetPyarrowFS.approx_statistics(self)
780 files_to_consider = np.array(self._dataset_info["all_files"])[idxs]
781 stats = [_STATS_CACHE[tokenize(finfo)] for finfo in files_to_consider]
--> 782 return _combine_stats(stats)
File /usr/local/lib/python3.11/site-packages/dask_expr/io/parquet.py:1859, in _combine_stats(stats)
1851 """Combine multiple file-level statistics into a single dict of metrics that
1852 represent the average values of the parquet statistics"""
1853 agg_cols = {
1854 "total_compressed_size": statistics.mean,
1855 "total_uncompressed_size": statistics.mean,
1856 "path_in_schema": lambda x: set(x).pop(),
1857 }
1858 return _agg_dicts(
-> 1859 _aggregate_statistics_to_file(stats),
1860 {
1861 "num_rows": statistics.mean,
1862 "num_row_groups": statistics.mean,
1863 "serialized_size": statistics.mean,
1864 "total_byte_size": statistics.mean,
1865 "columns": partial(_aggregate_columns, agg_cols=agg_cols),
1866 },
1867 )
File /usr/local/lib/python3.11/site-packages/dask_expr/io/parquet.py:1822, in _aggregate_statistics_to_file(stats)
1819 file_stat = file_stat.copy()
1820 aggregated_stats.append(file_stat)
-> 1822 file_stat.update(_agg_dicts(file_stat.pop("row_groups"), agg_func))
1823 return aggregated_stats
File /usr/local/lib/python3.11/site-packages/dask_expr/io/parquet.py:1779, in _agg_dicts(dicts, agg_funcs)
1777 agg = agg_funcs.get(k)
1778 if agg:
-> 1779 result2[k] = agg(v)
1780 return result2
File /usr/local/lib/python3.11/site-packages/dask_expr/io/parquet.py:1796, in _aggregate_columns(cols, agg_cols)
1794 break
1795 i += 1
-> 1796 return [_agg_dicts(c, agg_cols) for c in combine]
File /usr/local/lib/python3.11/site-packages/dask_expr/io/parquet.py:1796, in <listcomp>(.0)
1794 break
1795 i += 1
-> 1796 return [_agg_dicts(c, agg_cols) for c in combine]
File /usr/local/lib/python3.11/site-packages/dask_expr/io/parquet.py:1779, in _agg_dicts(dicts, agg_funcs)
1777 agg = agg_funcs.get(k)
1778 if agg:
-> 1779 result2[k] = agg(v)
1780 return result2
File /usr/local/lib/python3.11/site-packages/dask_expr/io/parquet.py:1779, in _agg_dicts(dicts, agg_funcs)
1777 agg = agg_funcs.get(k)
1778 if agg:
-> 1779 result2[k] = agg(v)
1780 return result2
TypeError: '<' not supported between instances of 'NoneType' and 'NoneType'
It seems that perhaps the issue might be with file statistics somehow?. I tried a few options of calling dd.read_parquet
differently but no luck.
Thanks in advance for any help