Slow processing of parquet dataset using the distributed client

I’m running a few operations on a parquet dataset with 144 files that are on average ~32MB each.

df = dd.read_parquet("./data_folder/")\
df = df.loc[df["Value"] != "INVALID"]
df["Signal"] = df["Signal"].str.split(".", 4)
df["Device"] = df["Signal"].str[1]
df["Column"] = df["Signal"].str[2:].str.join(".")
df = df.drop(columns=["Signal"])
df.to_parquet("output")

When I run this operation on only one of the files in the data, it takes about 17 seconds total. When I run it on 4 of the files, it takes about 50 seconds, even though there are 4 workers each with 4GB of ram available to them. When I try to run this operation on all 144 files, it takes over 9 minutes just for tasks to start appearing in the Task Stream and over an hour to finish the processing. This was all without a “_metadata” file. I tried again after creating a “_metadata” file, but there wasn’t much improvement. Is there a reason 4 files takes much longer than just one? If it takes 17 seconds to run this process on just one of the files, I’d expect it to take about 10 minutes to finish 144. I’m thinking about running the tasks on regular pandas with a python multiprocessing pool if I can’t figure out why the Dask client takes so long to start processing the files.

1 Like

Everything between the load and the save are essentially partition-wise operations and should be efficiently fused into single tasks. It appears this isn’t happening for you, and ought to be investigated. However, you can be explicit about it:

def process(df):
    df.loc[df["Value"] != "INVALID"]
    df["Signal"] = df["Signal"].str.split(".", 4)
    df["Device"] = df["Signal"].str[1]
    df["Column"] = df["Signal"].str[2:].str.join(".")
    return df.drop(columns=["Signal"])
  
df.map_partitions(process).to_parquet("output")

and see how you fare. Note that string and list operations don’t release the GIL, so you should use processes over threads for this (threads_per_worker=1).