I have a Dask script that I am running from command line on my local machine. I have a Macbook M1 with 8GB of RAM.
The script takes a dataset as an argument and runs some benchmark queries.
Here’s the basic structure of the script:
import dask.dataframe as dd
import pandas as pd
import dask
from helpers import benchmark, get_results
from dask_h2o_groupby_queries import *
import sys
from dask.distributed import Client, wait
print("dask version: %s" % dask.__version__)
parquet_path = sys.argv[1]
if __name__ == "__main__":
client = Client()
dask_parquet_benchmarks = {
"duration": [],
"task": [],
}
def q1(ddf):
return ddf.groupby("id1", dropna=False, observed=True).agg({"v1": "sum"}).compute()
ddf1 = dd.read_parquet(
parquet_path, columns=["id1", "v1"], engine="pyarrow"
).persist()
wait(ddf1)
benchmark(q1, df=ddf1, benchmarks=dask_parquet_benchmarks, name="q1")
del ddf1
When I run the script on the 0.5 GB h2o dataset, I get thousands of these warnings:
2022-07-18 17:59:58,932 - distributed.worker_memory - WARNING - Unmanaged memory use is high. This may indicate a memory leak or the memory may not be released to the OS; see Worker Memory Management — Dask.distributed 2022.8.1+6.gc15a10e8 documentation for more information. – Unmanaged memory: 1.50 GiB – Worker memory limit: 2.00 GiB
2022-07-18 17:59:58,934 - distributed.worker_memory - WARNING - Unmanaged memory use is high. This may indicate a memory leak or the memory may not be released to the OS; see for more information. – Unmanaged memory: 1.50 GiB – Worker memory limit: 2.00 GiB
When I run the script on the 1e8 dataset with 5 GB of data (on disk), the workers crash and the entire script can’t finish executing.
Any suggestions on how to better run this script and control all the warning messages?