I feel my use case is very ordinary: just trying to load a large dataset with from_map and read_csv and save it to parquet files to save loading time and create index (as I need fast random index access on small memory machines)
I’m using a local cluster (8 workers, 8 threads per worker, 32GB max memory per worker).
The dataset I’m reading has 10 compressed (gzip) .tsv files, each of 2GB size (unzipped file of around 6GB)
My approach is simple:
`def load_tsv(file):
df = dd.read_csv(path, sep=‘\t’, dtype=meta_dict, compression=‘gzip’)
return df
ddf = dd.from_map(load_tsv, file_list)
ddf = ddf.repartition(npartitions=512)
ddf.to_parquet(‘./parquets’, schema=meta_pa)
`
I believe the intended outcome is saving a parquet file before memory is used up, and npartitions=512 should be suitable for this size of dataset? I also tried larger or smaller values like 100 to 1000, but the dashboard always look like:
And lead to ever restarting workers until a forced stop for timeout
My first though was the blocksize
parameter in read_csv
, at some point there was some warning message saying compressed file does not support blocksize and will load whole tsv file. (Though, loading the whole file shouldn’t consume so much memory anyway…) But I unzip them and read the unzipped tsv files, but this time there are some EOF errors which I believe due to handling of blocksize.
Then I realized in the docs (dask.dataframe.from_map — Dask documentation) it used pandas.read_csv instead of dask’s read_csv. And I changed, though still restart workers for OOM few times, it survived till the end.
So it puzzles me now what is the difference between the using pd.read_csv and dd.read_csv? My intitial though is Dask will handle the “cold loading” and GC according to the partition size, so it should overcome OOM issue better than using pd.read_csv? (Thinking of a single file larger than memory)