I have a Dask dataframe with two partitions, that was created by reading and combining a number of csv files. I am able to save and reload the df to/from a parquet file except for when I add the following before calling to_parquet():
merged_df['Seq'] = merged_df.groupby('event_no_trip').cumcount()+1
When that line is included I get the stack trace below. I am guessing that this is something to do with trying to use groupby() over more than one partition, but I have no idea how to fix it. Could anyone please provide some insight here? Thanks for your help.
merged_df.to_parquet(fullFolderPath)
File "C:\venviron\Lib\site-packages\dask_expr\_collection.py", line 3108, in to_parquet
return to_parquet(self, path, **kwargs)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "C:\venviron\Lib\site-packages\dask_expr\io\parquet.py", line 463, in to_parquet
df.to_dask_dataframe(),
^^^^^^^^^^^^^^^^^^^^^^
File "C:\venviron\Lib\site-packages\dask_expr\_collection.py", line 1252, in to_dask_dataframe
df = self.optimize(**optimize_kwargs) if optimize else self
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "C:\venviron\Lib\site-packages\dask_expr\_collection.py", line 482, in optimize
return new_collection(self.expr.optimize(fuse=fuse))
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "C:\venviron\Lib\site-packages\dask_expr\_expr.py", line 93, in optimize
return optimize(self, **kwargs)
^^^^^^^^^^^^^^^^^^^^^^^^
File "C:\venviron\Lib\site-packages\dask_expr\_expr.py", line 2876, in optimize
return optimize_until(expr, stage)
^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "C:\venviron\Lib\site-packages\dask_expr\_expr.py", line 2842, in optimize_until
expr = expr.simplify()
^^^^^^^^^^^^^^^
File "C:\venviron\Lib\site-packages\dask_expr\_core.py", line 360, in simplify
new = expr.simplify_once(dependents=dependents, simplified={})
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "C:\venviron\Lib\site-packages\dask_expr\_core.py", line 338, in simplify_once
new = operand.simplify_once(
^^^^^^^^^^^^^^^^^^^^^^
File "C:\venviron\Lib\site-packages\dask_expr\_core.py", line 338, in simplify_once
new = operand.simplify_once(
^^^^^^^^^^^^^^^^^^^^^^
File "C:\venviron\Lib\site-packages\dask_expr\_core.py", line 338, in simplify_once
new = operand.simplify_once(
^^^^^^^^^^^^^^^^^^^^^^
File "C:\venviron\Lib\site-packages\dask_expr\_core.py", line 321, in simplify_once
out = child._simplify_up(expr, dependents)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "C:\venviron\Lib\site-packages\dask_expr\_shuffle.py", line 109, in _simplify_up
new_projection = [
^
File "C:\venviron\Lib\site-packages\dask_expr\_shuffle.py", line 112, in <listcomp>
if (col in partitioning_index or col in projection)
^^^^^^^^^^^^^^^^^
TypeError: argument of type 'int' is not iterable