Dask read_csv() multiple files but separate partition for each file

Hi everyone, my question is basically what’s in the title.

I originally started with a single massive .csv file containing genetic mutation IDs (chromosome + position + alleles, e.g, chr1-10000-A-G) per individual ID, with (1 individual ID + 1 mutation ID) per row. As a single file, len(list(my_dask_df.partitions)) returns 1.

After reading the API reference on dask.dataframe.read_csv, I noticed an argument blocksize that has the following line at the end:

If None, a single block is used for each file.

Based on this, I split my original file by chromosome, ending up with 23 files with different row counts (from my_file.csv to my_file_chr{1,2,...,23}.csv). The reason for this kind of split is because mutations are biologically split by chromosome, so any Dask.DataFrame level operations I perform would be both accurate & optimized.

I thought that by providing a list of file paths & setting blocksize=None, read_csv would chunk the data such that rows for each chromosome would end up in its own partition.

However this didn’t happen, len(list(my_dask_df.partitions)) still returns 1.

Could anyone let me know where I went wrong with this? I’m fine with either:

  • reading a list of files such that each file ends up in its own partition, or
  • correctly repartitioning a single-partition Dask.DataFrame by values in a single column.

Any help with this would be greatly appreciated!

Hi @ama249, welcome to Dask community!

How large is your original csv file? If large enough, it should be automatically cut into several partitions.

In any case, there should be at least one partition per file, so based on what you say, you should have 23 partitions.

Could you provide the code you are using to read your data, or even better a reproducer? I just tried with two small CSV file and end up with 2 partitions as expected.

Hi @guillaumeeb , the original my_file.csv file was about 6GB, & after I split it manually by chromosome (i.e., my_file_chr{1,2,...,23}.csv, their sizes range from about 700MB to 4MB.

The code for the original file was:

from dask import dataframe as ddf

my_dask_df = ddf.read_csv("/path/to/my_file.csv", blocksize="default", sep=",")

print(f"# partitions = {len(list(my_dask_df.partitions))}")

This would return # partitions = 1.

The code for the split files is:

from dask import dataframe as ddf

files_list = [f"/path/to/my_file_chr{i}.csv" for i in range(1, 23+1)]
my_dask_df = ddf.read_csv("/path/to/my_file.csv", blocksize=None, sep=",")

print(f"# partitions = {len(list(my_dask_df.partitions))}")

This also returns # partitions = 1.

Default behavior of Dask should split your files in partitions of 64MB size, so about a undred partitions in your case. You can also get the number of partitions using the npartitions attribute of the dataframe. I don’t see in this simple code why you would end up with a single partition.

In the second part of your code, there is probably some mistake in the sample you posted:

Anyway, you can also use wildcards:

And the partitions number should be at least 23.

Hi @guillaumeeb , you’re right, there was a slight mistake in my example, what I actually do is pass a list of string paths to ddf.read_csv().

With regards to my actual question, after recreating the conda environment with just Python, Numpy, Pandas & Dask installed, I cannot recreate my original error. It works as per the docs & your explanation, i.e., a separate partition per sub-file.

I doubt I can figure out why that is, as this is a cloud-based machine I’ve been given external access to. So for now, I’ll mark this as resolved. Thank you very much for all your help!

1 Like