Understanding partitions, groupby, and memory usage

Hi all, I have 1,500 dataframes stored in parquet format on disk. Each data frame has three columns (group, record_id, and weight, which are string, integer (but could be categorical), and float in nature). Each dataframe uses about 500 MB of memory when I load it with pandas and check DataFrame.memory_usage().

The system I am working on has 250 GB of RAM and 64 cores.

Across the dataframes there are a lot of duplicate groups that also contain duplicate record IDs, and so I’d like to group by group and record_id so that I can get a few other stats, such as the count of all the records in a “group + record ID” group, the average weight (mean_weight), and then the weighted_count which would simply be the product of the count and mean_weight.

I’m really struggling with understanding how to keep the memory burden under control while doing this process, and so I’m tweaking partitions after reading the data as well as the number of partitions that the Groupby.aggregrate method turns out, but I can’t seem to keep the memory load from exceeding a desired limit (I don’t want to utilize all of the memory on the system for the entire processing time).

In the following code, I’ve taken two opportunities to repartition: first I explicitly call dask.DataFrame.repartition when reading the 1,500 dataframes. As for the number of partitions I chose, I originally chose 7,500 as that would ensure that each chunk should be around 100 MB, but as I keep running out of space I’ve gradually increased it to 8,960.

After the groupby operation, I chose 5,120 partitions for split_out as the number of groups will still retain high cardinality, though the size of the data should definitely decrease somewhat significantly.

With these partition settings, however, the code still is on track to take ~12 hours with memory usage at 99-100%. At this point, time isn’t so much of an issue—I just want to know what I can do to ensure that I can leave the thing running and I won’t encounter memory errors (or cause them to occur for anyone else on the machine).

I want to love Dask but I still feel like I’m missing some pieces when it comes to comfortably utilizing it. Any help would be much appreciated!

from pathlib import Path

import dask.dataframe as dd
from dask.diagnostics.progress import ProgressBar

ddf = (
    dd.read_parquet(
        list(Path("path/to/dataframes").glob("*.parquet")), dtype_backend="pyarrow"
    )
    .astype(
        {
            "group": "string[pyarrow]",
            "record_id": "category",
            "weight": "float32",
        }
    )
    .repartition(npartitions=8960)   # Was originally ~7500 as it = 0.5 GB * 1500 files / 0.1
)

result = (
    ddf.groupby(
        ["group", "record_id"], observed=True, sort=False
    )
    .aggregate(
        count=("weight", "count"),
        mean_weight=("weight", "mean"),
        split_out=5120,  # Aggregated data should be smaller, but still shows high cardinality in groups
        shuffle_method="tasks",
    )
    .reset_index()
)

result = result.assign(weighted_count=result["count"] * result["mean_weight"]).astype(
    {
        "group": "string[pyarrow]",
        "record_id": "category",
        "count": "int16",
        "weighted_count": "float32",
    }
)
result = result.drop(columns=["mean_weight"])

with ProgressBar():
        result.to_parquet(
            "path/to/file.parquet",
            engine="pyarrow",
            write_options={"compression": "ZSTD", "compression_level": 15},
        )

Hi @lusk, welcome to Dask Community!

Your problem definitly sounds like something Dask should be able to solve.

Firs, are you using a distributed Scheduler? If not, I really think creating a LocalCluster and associated Client could help you: both for ensuring memory is kept under a specific threshold (using memory_limit and specifying the number of workers), and secondly to monitor what might be the problem using the associated Dashboard.

A few other thoughts:

I wouldn’t do that as repartitionning can be expensive, and Dask should use sensible default. If you want to change the default thought, you should specify the size of the blocks/partitions you want in read_parquet call.

I’m not completly sure of the split_out kwarg, if you have a big number of different group + record_id combinations, then I don’t think this is useful.

Also in the code example, instead of using astype, I would recommend setting dtypes in the read_parquet call, to avoid a conversion afterwards.

With all that, I would launch the computation and follow its execution through the dashboard, see if the Worker memory is being overloaded at some point, and try to identify why. Since you’re doing an aggregation the groupy phase should be able to stream the results quite well. But it might depend on how your input dataset is balanced between group and record_id.