I have a 1.7TB csv file that I need to filter by only keeping rows where Column ID is in a predetermined list (135,000,000) and save the result as a standalone csv. I ran my code below and unfortunately it is still running eight days later. I’m not entirely clear why it is taking so long and I’m worried if I stop it from running that it could be close to converging…
def preprocess_df(input_df):
mask = input_df['ID'].isin(list_IDs)
return input_df[mask]
df = dd.read_csv(file_path, usecols=['ID', 'data1'], blocksize='50MB')
df_filtered = df.map_partitions(process_df, meta=df)
df_filtered.to_csv('output.csv', index=False, single_file=True)
Please help!
Hi @alan-dimitriev, welcome to Dask community!
First, are you doing this on a distributed cluster? Or only running Dask with the default Dataframe threaded Scheduler? On which infrastructure, which disk type? 1.7TB is huge for a single CSV! The predetermined list of IDs is also really big!
Did you try to run the Pandas code on a subset on your dataset to see how much time it was taking?
Also, writing results to a single file will cause file locking and increasing time.
Hey Guillaume,
I appreciate you responding! I am new to this type of computing so your experience is very valuable!
First, are you doing this on a distributed cluster? Or only running Dask with the default Dataframe threaded Scheduler?
I tried setting up a cluster initially but ran into numerous memory issues so I just started working with the default Dataframe threaded scheduler.
On which infrastructure, which disk type?
I’m working on a remote linux system with 8 CPU’s, 3 HDD’s, and 70 GB of total memory with 303 in swap.
1.7TB is huge for a single CSV! The predetermined list of IDs is also really big!
For reasons which I both understand and bemoan this was allegedly the best way to store the data.
Did you try to run the Pandas code on a subset on your dataset to see how much time it was taking?
I ended up using pandas and chunk reading the csv with a chunck size of 1 million to do what I needed. Took 24 hours lol
I am still very curious as how I can use dask to optimize this process incase I am ever faced with it in the future. How do you set it up so that the output is written to multiple files to avoid locking?
I am new to dask and was wondering if you had any resources you like to use or any repositories that are good demonstrations of implementation?
Thanks!
I imagine your process is mainly IO bound, so in this case, I would start a LocalCluster with 8 workers and 1 thread per worker, and then try your code but removing the single_file=True
kwarg in to_csv. Maybe I would try to increase blocksize a bit (like 512MB?) too to avoid ending with too many CSV files.
But there is probably other ways than Dask, maybe with plain shell syntax, to perform this transformation efficiently.