Read/Filter CSV taking 7+ Days

I have a 1.7TB csv file that I need to filter by only keeping rows where Column ID is in a predetermined list (135,000,000) and save the result as a standalone csv. I ran my code below and unfortunately it is still running eight days later. I’m not entirely clear why it is taking so long and I’m worried if I stop it from running that it could be close to converging…

def preprocess_df(input_df):
    mask = input_df['ID'].isin(list_IDs)
    return input_df[mask]

df = dd.read_csv(file_path, usecols=['ID', 'data1'], blocksize='50MB')
df_filtered = df.map_partitions(process_df, meta=df)
df_filtered.to_csv('output.csv', index=False, single_file=True)

Please help!

Hi @alan-dimitriev, welcome to Dask community!

First, are you doing this on a distributed cluster? Or only running Dask with the default Dataframe threaded Scheduler? On which infrastructure, which disk type? 1.7TB is huge for a single CSV! The predetermined list of IDs is also really big!

Did you try to run the Pandas code on a subset on your dataset to see how much time it was taking?

Also, writing results to a single file will cause file locking and increasing time.

Hey Guillaume,

I appreciate you responding! I am new to this type of computing so your experience is very valuable!

First, are you doing this on a distributed cluster? Or only running Dask with the default Dataframe threaded Scheduler?

I tried setting up a cluster initially but ran into numerous memory issues so I just started working with the default Dataframe threaded scheduler.

On which infrastructure, which disk type?

I’m working on a remote linux system with 8 CPU’s, 3 HDD’s, and 70 GB of total memory with 303 in swap.

1.7TB is huge for a single CSV! The predetermined list of IDs is also really big!

For reasons which I both understand and bemoan this was allegedly the best way to store the data.

Did you try to run the Pandas code on a subset on your dataset to see how much time it was taking?

I ended up using pandas and chunk reading the csv with a chunck size of 1 million to do what I needed. Took 24 hours lol

I am still very curious as how I can use dask to optimize this process incase I am ever faced with it in the future. How do you set it up so that the output is written to multiple files to avoid locking?

I am new to dask and was wondering if you had any resources you like to use or any repositories that are good demonstrations of implementation?

Thanks!

  • Alan

I imagine your process is mainly IO bound, so in this case, I would start a LocalCluster with 8 workers and 1 thread per worker, and then try your code but removing the single_file=True kwarg in to_csv. Maybe I would try to increase blocksize a bit (like 512MB?) too to avoid ending with too many CSV files.

But there is probably other ways than Dask, maybe with plain shell syntax, to perform this transformation efficiently.