Hello,
I’ve heard about the dask parquet improvements, and I wanted to try a few things out. In many cases, performance is much, much better, but dask still seems to be very slow with some things.
For example, I have a ~2TB parquet file with about 30K partitions. It’s an imbalanced dataset where column y has many more 0’s than 1’s. In the following function, my goal is to downsample the zeros.
import dask.dataframe as dd
def downsample(df: dd.DataFrame) -> dd.DataFrame:
pos_idx = df["y"] == 1
df_pos = df.loc[pos_idx]
df_neg = df.loc[~pos_idx].sample(frac=1e-4)
df = dd.concat([df_pos, df_neg], axis=0)
df = df.repartition(npartitions=100)
df = df.set_index("index").reset_index()
return df
When I call this function on a dataframe created by dd.read_parquet
and save the output as another parquet file with to_parquet
, the process takes about 40 minutes on a cluster of five 64-core machines. Is there any way to speed it up?
For comparison, if I change all the dask calls in the downsample
function to pandas calls, and I use ray to parallelize a process where I load a partition, run downsample
on it, and save it to a new parquet file, it takes a few seconds on the same cluster.