Dask very slow with simple processing of large parquet file

Hello,

I’ve heard about the dask parquet improvements, and I wanted to try a few things out. In many cases, performance is much, much better, but dask still seems to be very slow with some things.

For example, I have a ~2TB parquet file with about 30K partitions. It’s an imbalanced dataset where column y has many more 0’s than 1’s. In the following function, my goal is to downsample the zeros.

import dask.dataframe as dd

def downsample(df: dd.DataFrame) -> dd.DataFrame:
    pos_idx = df["y"] == 1
    df_pos = df.loc[pos_idx]
    df_neg = df.loc[~pos_idx].sample(frac=1e-4)
    df = dd.concat([df_pos, df_neg], axis=0)
    df = df.repartition(npartitions=100)
    df = df.set_index("index").reset_index()
    return df

When I call this function on a dataframe created by dd.read_parquet and save the output as another parquet file with to_parquet, the process takes about 40 minutes on a cluster of five 64-core machines. Is there any way to speed it up?

For comparison, if I change all the dask calls in the downsample function to pandas calls, and I use ray to parallelize a process where I load a partition, run downsample on it, and save it to a new parquet file, it takes a few seconds on the same cluster.

What does .set_index().reset_index() achieve? This is probably expensive. Especially, given that you just repartitioned, so you are probably shuffling multiple times here.

Your cluster dashboard will tell you what time is actually being spent on, so that’s a good place to start.

The inner few operations could be considered a map() or partition-wise, but may be too complex together for dask to fuse. Going partition-wise:

def op(df):
    pos_idx = df["y"] == 1
    df_pos = df.loc[pos_idx]
    df_neg = df.loc[~pos_idx].sample(frac=1e-4)
    return pd.concat([df_pos, df_neg], axis=0)

df2 = df.map_partitions(op)  # pass meta=, if easy to know
1 Like

Thanks, Martin. The .set_index().reset_index() puts the data in its original order. I want to avoid having all the positive (negative) examples at the head (tail) of the data frame.

Will use map_partitions instead. Thanks!