Using dask's read_csv or pandas's read_csv in from_map?

I feel my use case is very ordinary: just trying to load a large dataset with from_map and read_csv and save it to parquet files to save loading time and create index (as I need fast random index access on small memory machines)
I’m using a local cluster (8 workers, 8 threads per worker, 32GB max memory per worker).
The dataset I’m reading has 10 compressed (gzip) .tsv files, each of 2GB size (unzipped file of around 6GB)

My approach is simple:
`def load_tsv(file):
df = dd.read_csv(path, sep=‘\t’, dtype=meta_dict, compression=‘gzip’)
return df

ddf = dd.from_map(load_tsv, file_list)
ddf = ddf.repartition(npartitions=512)
ddf.to_parquet(‘./parquets’, schema=meta_pa)
`
I believe the intended outcome is saving a parquet file before memory is used up, and npartitions=512 should be suitable for this size of dataset? I also tried larger or smaller values like 100 to 1000, but the dashboard always look like:
image
And lead to ever restarting workers until a forced stop for timeout

My first though was the blocksize parameter in read_csv, at some point there was some warning message saying compressed file does not support blocksize and will load whole tsv file. (Though, loading the whole file shouldn’t consume so much memory anyway…) But I unzip them and read the unzipped tsv files, but this time there are some EOF errors which I believe due to handling of blocksize.

Then I realized in the docs (dask.dataframe.from_map — Dask documentation) it used pandas.read_csv instead of dask’s read_csv. And I changed, though still restart workers for OOM few times, it survived till the end.

So it puzzles me now what is the difference between the using pd.read_csv and dd.read_csv? My intitial though is Dask will handle the “cold loading” and GC according to the partition size, so it should overcome OOM issue better than using pd.read_csv? (Thinking of a single file larger than memory)

Hi @krg_lan, welcome to Dask Discourse forum!

If you use from_map, you are not expected to return Dask DataFrame, from_map should return partitions of a Dask DataFrame, so Pandas Dataframe. I’m not sure of the behavior if trying to build Dask Dataframe from Dask Dataframe partitions…

That said, why don’t you use only read_csv on all your files? You shouldn’t need from_map here.

Next, repartitionning can be expensive, expecially if you are creating an index. I don’t see that in your code though?

Maybe you should first try to read the file and write them as Parquet without repartitionning to better understand where the problem can be.

Hi @guillaumeeb , thank you for your reply!

I see, for my case just read_csv in simple loop, create index and (optionally) repartition then to_parquet (and take care of filename formats) will work.

What will be the use case for that described in the last part of from_map 's doc ?

I’m not sure I fully understand your last question. Reading the documentation sounds clear to me:

from_map is the preferred option when reading from data sources that are not natively supported by Dask or if the data source requires custom handling before handing things of to Dask DataFrames. Examples are things like binary files or other unstructured data that doesn’t have an IO connector.

Since from_map allows you to map an arbitrary function to any number of iterable objects, it can be a very convenient means of implementing functionality that may be missing from from other DataFrame-creation methods. For example, if you happen to have apriori knowledge about the number of rows in each of the files in a dataset, you can generate a DataFrame collection with a global RangeIndex