Hi all, I have 1,500 dataframes stored in parquet format on disk. Each data frame has three columns (group
, record_id
, and weight
, which are string, integer (but could be categorical), and float in nature). Each dataframe uses about 500 MB of memory when I load it with pandas and check DataFrame.memory_usage()
.
The system I am working on has 250 GB of RAM and 64 cores.
Across the dataframes there are a lot of duplicate groups that also contain duplicate record IDs, and so I’d like to group by group
and record_id
so that I can get a few other stats, such as the count of all the records in a “group + record ID” group, the average weight
(mean_weight
), and then the weighted_count
which would simply be the product of the count
and mean_weight
.
I’m really struggling with understanding how to keep the memory burden under control while doing this process, and so I’m tweaking partitions after reading the data as well as the number of partitions that the Groupby.aggregrate
method turns out, but I can’t seem to keep the memory load from exceeding a desired limit (I don’t want to utilize all of the memory on the system for the entire processing time).
In the following code, I’ve taken two opportunities to repartition: first I explicitly call dask.DataFrame.repartition
when reading the 1,500 dataframes. As for the number of partitions I chose, I originally chose 7,500 as that would ensure that each chunk should be around 100 MB, but as I keep running out of space I’ve gradually increased it to 8,960.
After the groupby
operation, I chose 5,120 partitions for split_out
as the number of groups will still retain high cardinality, though the size of the data should definitely decrease somewhat significantly.
With these partition settings, however, the code still is on track to take ~12 hours with memory usage at 99-100%. At this point, time isn’t so much of an issue—I just want to know what I can do to ensure that I can leave the thing running and I won’t encounter memory errors (or cause them to occur for anyone else on the machine).
I want to love Dask but I still feel like I’m missing some pieces when it comes to comfortably utilizing it. Any help would be much appreciated!
from pathlib import Path
import dask.dataframe as dd
from dask.diagnostics.progress import ProgressBar
ddf = (
dd.read_parquet(
list(Path("path/to/dataframes").glob("*.parquet")), dtype_backend="pyarrow"
)
.astype(
{
"group": "string[pyarrow]",
"record_id": "category",
"weight": "float32",
}
)
.repartition(npartitions=8960) # Was originally ~7500 as it = 0.5 GB * 1500 files / 0.1
)
result = (
ddf.groupby(
["group", "record_id"], observed=True, sort=False
)
.aggregate(
count=("weight", "count"),
mean_weight=("weight", "mean"),
split_out=5120, # Aggregated data should be smaller, but still shows high cardinality in groups
shuffle_method="tasks",
)
.reset_index()
)
result = result.assign(weighted_count=result["count"] * result["mean_weight"]).astype(
{
"group": "string[pyarrow]",
"record_id": "category",
"count": "int16",
"weighted_count": "float32",
}
)
result = result.drop(columns=["mean_weight"])
with ProgressBar():
result.to_parquet(
"path/to/file.parquet",
engine="pyarrow",
write_options={"compression": "ZSTD", "compression_level": 15},
)