Hi, I have a question regarding the optimal/correct use of dask. I have a need to run groupby
on a dask dataframe that is approximately 1B rows (~24 columns). The raw data are strored as many csvs (each about 50-100M rows) and currently I am reading those into memory (1 TB memory on this particular VM). My goal is to write the grouped by csvs to disk in parallel (112 cores on this VM). This is because I have to do this task for many different ~1B datasets and I need timely processing. I believe what I am doing is not correct, because my processor usage is low (~1%) when I do the groupby.
for month in MONTHS:
csv_wildcard_path = Path(path_to_input_csvs) / f"{month}-*.csv"
gdf = dd.read_csv(csv_wildcard_path, usecols=DFCOLS, dtype=dtype, assume_missing=True) # this is about 1B rows (400 or so GB)
gdf = gdf.map_partitions(convert_partition, meta=EXAMPLE).persist()
gdf.groupby(["ID"]).apply(write_to_csv, meta=gdf._meta).compute() # this is only using 1 core despite all of the available (112) processors
One note that may or may not be relevant is that the convert_partition
function above is:
def convert_partition(df: dd) -> sp.GeoDataFrame:
I have tried wrapping this in a context manager with scheduler=processes
but the groupby operation still only uses apparently one core. Note that I know writing to csv is not optimal (and parquet is preferred), but i have a requirement to write to csv, and that is not the current bottleneck in the code – the groupby operation is the bottleneck.