Using all cores on a large VM (112 cores/1TB ram) when grouping by on a dask dataframe

Hi, I have a question regarding the optimal/correct use of dask. I have a need to run groupby on a dask dataframe that is approximately 1B rows (~24 columns). The raw data are strored as many csvs (each about 50-100M rows) and currently I am reading those into memory (1 TB memory on this particular VM). My goal is to write the grouped by csvs to disk in parallel (112 cores on this VM). This is because I have to do this task for many different ~1B datasets and I need timely processing. I believe what I am doing is not correct, because my processor usage is low (~1%) when I do the groupby.

for month in MONTHS: 
    csv_wildcard_path = Path(path_to_input_csvs) / f"{month}-*.csv"
    gdf = dd.read_csv(csv_wildcard_path, usecols=DFCOLS, dtype=dtype, assume_missing=True) # this is about 1B rows (400 or so GB)
    gdf = gdf.map_partitions(convert_partition, meta=EXAMPLE).persist()
    gdf.groupby(["ID"]).apply(write_to_csv, meta=gdf._meta).compute() # this is only using 1 core despite all of the available (112) processors

One note that may or may not be relevant is that the convert_partition function above is:

def convert_partition(df: dd) -> sp.GeoDataFrame:

I have tried wrapping this in a context manager with scheduler=processes but the groupby operation still only uses apparently one core. Note that I know writing to csv is not optimal (and parquet is preferred), but i have a requirement to write to csv, and that is not the current bottleneck in the code – the groupby operation is the bottleneck.

Is there a reason why you’re doing an apply instead of using to_csv?

Within the write_to_csv method there were a few extra processing steps. I didn’t realize that would cause issues. Thanks for the suggestion! I will attempt a refactor to allow for the direct call to to_csv.

Last time I checked, to_csv and read_csv block the GIL. Even in perfectly GIL-releasing workflows, you will still have a lot of bottlenecks if you have that many threads in a single process.

scheduler=process is not peer-to-peer - meaning that all data goes through your main process, which becomes your bottleneck.
Try starting a LocalCluster with multiple processes and between 1 and 8 threads per process:

import distributed
client = distributed.Client(n_workers=28, threads_per_worker=4)